Whamcloud - gitweb
b=21776 Use atomic memory allocation when VM is flushing to free memory.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
362 out:
363         rc = sa->sa_upcall(sa->sa_cookie, rc);
364         RETURN(rc);
365 }
366
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368                            struct obd_trans_info *oti,
369                            obd_enqueue_update_f upcall, void *cookie,
370                            struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request   *req;
373         struct osc_setattr_args *sa;
374         int                      rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404                 sa = ptlrpc_req_async_args(req);
405                 sa->sa_oa = oinfo->oi_oa;
406                 sa->sa_upcall = upcall;
407                 sa->sa_cookie = cookie;
408
409                 if (rqset == PTLRPCD_SET)
410                         ptlrpcd_add_req(req, PSCOPE_OTHER);
411                 else
412                         ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419                              struct obd_trans_info *oti,
420                              struct ptlrpc_request_set *rqset)
421 {
422         return osc_setattr_async_base(exp, oinfo, oti,
423                                       oinfo->oi_cb_up, oinfo, rqset);
424 }
425
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
428 {
429         struct ptlrpc_request *req;
430         struct ost_body       *body;
431         struct lov_stripe_md  *lsm;
432         int                    rc;
433         ENTRY;
434
435         LASSERT(oa);
436         LASSERT(ea);
437
438         lsm = *ea;
439         if (!lsm) {
440                 rc = obd_alloc_memmd(exp, &lsm);
441                 if (rc < 0)
442                         RETURN(rc);
443         }
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
446         if (req == NULL)
447                 GOTO(out, rc = -ENOMEM);
448
449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 GOTO(out, rc);
453         }
454
455         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
456         LASSERT(body);
457         lustre_set_wire_obdo(&body->oa, oa);
458
459         ptlrpc_request_set_replen(req);
460
461         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462             oa->o_flags == OBD_FL_DELORPHAN) {
463                 DEBUG_REQ(D_HA, req,
464                           "delorphan from OST integration");
465                 /* Don't resend the delorphan req */
466                 req->rq_no_resend = req->rq_no_delay = 1;
467         }
468
469         rc = ptlrpc_queue_wait(req);
470         if (rc)
471                 GOTO(out_req, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL)
475                 GOTO(out_req, rc = -EPROTO);
476
477         lustre_get_wire_obdo(oa, &body->oa);
478
479         /* This should really be sent by the OST */
480         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481         oa->o_valid |= OBD_MD_FLBLKSZ;
482
483         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484          * have valid lsm_oinfo data structs, so don't go touching that.
485          * This needs to be fixed in a big way.
486          */
487         lsm->lsm_object_id = oa->o_id;
488         lsm->lsm_object_seq = oa->o_seq;
489         *ea = lsm;
490
491         if (oti != NULL) {
492                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
493
494                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495                         if (!oti->oti_logcookies)
496                                 oti_alloc_cookies(oti, 1);
497                         *oti->oti_logcookies = oa->o_lcookie;
498                 }
499         }
500
501         CDEBUG(D_HA, "transno: "LPD64"\n",
502                lustre_msg_get_transno(req->rq_repmsg));
503 out_req:
504         ptlrpc_req_finished(req);
505 out:
506         if (rc && !*ea)
507                 obd_free_memmd(exp, &lsm);
508         RETURN(rc);
509 }
510
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512                    obd_enqueue_update_f upcall, void *cookie,
513                    struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request   *req;
516         struct osc_setattr_args *sa;
517         struct ost_body         *body;
518         int                      rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
522         if (req == NULL)
523                 RETURN(-ENOMEM);
524
525         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532         ptlrpc_at_set_req_timeout(req);
533
534         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
535         LASSERT(body);
536         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537         osc_pack_capa(req, body, oinfo->oi_capa);
538
539         ptlrpc_request_set_replen(req);
540
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PSCOPE_OTHER);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557                      struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568                     struct lov_stripe_md *md, obd_size start, obd_size end,
569                     void *capa)
570 {
571         struct ptlrpc_request *req;
572         struct ost_body       *body;
573         int                    rc;
574         ENTRY;
575
576         if (!oa) {
577                 CDEBUG(D_INFO, "oa NULL\n");
578                 RETURN(-EINVAL);
579         }
580
581         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
582         if (req == NULL)
583                 RETURN(-ENOMEM);
584
585         osc_set_capa_size(req, &RMF_CAPA1, capa);
586         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
587         if (rc) {
588                 ptlrpc_request_free(req);
589                 RETURN(rc);
590         }
591
592         /* overload the size and blocks fields in the oa with start/end */
593         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
594         LASSERT(body);
595         lustre_set_wire_obdo(&body->oa, oa);
596         body->oa.o_size = start;
597         body->oa.o_blocks = end;
598         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599         osc_pack_capa(req, body, capa);
600
601         ptlrpc_request_set_replen(req);
602
603         rc = ptlrpc_queue_wait(req);
604         if (rc)
605                 GOTO(out, rc);
606
607         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
608         if (body == NULL)
609                 GOTO(out, rc = -EPROTO);
610
611         lustre_get_wire_obdo(oa, &body->oa);
612
613         EXIT;
614  out:
615         ptlrpc_req_finished(req);
616         return rc;
617 }
618
619 /* Find and cancel locally locks matched by @mode in the resource found by
620  * @objid. Found locks are added into @cancel list. Returns the amount of
621  * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
623                                    cfs_list_t *cancels,
624                                    ldlm_mode_t mode, int lock_flags)
625 {
626         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627         struct ldlm_res_id res_id;
628         struct ldlm_resource *res;
629         int count;
630         ENTRY;
631
632         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
634         if (res == NULL)
635                 RETURN(0);
636
637         LDLM_RESOURCE_ADDREF(res);
638         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639                                            lock_flags, 0, NULL);
640         LDLM_RESOURCE_DELREF(res);
641         ldlm_resource_putref(res);
642         RETURN(count);
643 }
644
645 static int osc_destroy_interpret(const struct lu_env *env,
646                                  struct ptlrpc_request *req, void *data,
647                                  int rc)
648 {
649         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
650
651         cfs_atomic_dec(&cli->cl_destroy_in_flight);
652         cfs_waitq_signal(&cli->cl_destroy_waitq);
653         return 0;
654 }
655
656 static int osc_can_send_destroy(struct client_obd *cli)
657 {
658         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659             cli->cl_max_rpcs_in_flight) {
660                 /* The destroy request can be sent */
661                 return 1;
662         }
663         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664             cli->cl_max_rpcs_in_flight) {
665                 /*
666                  * The counter has been modified between the two atomic
667                  * operations.
668                  */
669                 cfs_waitq_signal(&cli->cl_destroy_waitq);
670         }
671         return 0;
672 }
673
674 /* Destroy requests can be async always on the client, and we don't even really
675  * care about the return code since the client cannot do anything at all about
676  * a destroy failure.
677  * When the MDS is unlinking a filename, it saves the file objects into a
678  * recovery llog, and these object records are cancelled when the OST reports
679  * they were destroyed and sync'd to disk (i.e. transaction committed).
680  * If the client dies, or the OST is down when the object should be destroyed,
681  * the records are not cancelled, and when the OST reconnects to the MDS next,
682  * it will retrieve the llog unlink logs and then sends the log cancellation
683  * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
686                        struct obd_export *md_export, void *capa)
687 {
688         struct client_obd     *cli = &exp->exp_obd->u.cli;
689         struct ptlrpc_request *req;
690         struct ost_body       *body;
691         CFS_LIST_HEAD(cancels);
692         int rc, count;
693         ENTRY;
694
695         if (!oa) {
696                 CDEBUG(D_INFO, "oa NULL\n");
697                 RETURN(-EINVAL);
698         }
699
700         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701                                         LDLM_FL_DISCARD_DATA);
702
703         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
704         if (req == NULL) {
705                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
706                 RETURN(-ENOMEM);
707         }
708
709         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
711                                0, &cancels, count);
712         if (rc) {
713                 ptlrpc_request_free(req);
714                 RETURN(rc);
715         }
716
717         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718         ptlrpc_at_set_req_timeout(req);
719
720         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721                 oa->o_lcookie = *oti->oti_logcookies;
722         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
723         LASSERT(body);
724         lustre_set_wire_obdo(&body->oa, oa);
725
726         osc_pack_capa(req, body, (struct obd_capa *)capa);
727         ptlrpc_request_set_replen(req);
728
729         /* don't throttle destroy RPCs for the MDT */
730         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731                 req->rq_interpret_reply = osc_destroy_interpret;
732                 if (!osc_can_send_destroy(cli)) {
733                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
734                                                           NULL);
735
736                         /*
737                          * Wait until the number of on-going destroy RPCs drops
738                          * under max_rpc_in_flight
739                          */
740                         l_wait_event_exclusive(cli->cl_destroy_waitq,
741                                                osc_can_send_destroy(cli), &lwi);
742                 }
743         }
744
745         /* Do not wait for response */
746         ptlrpcd_add_req(req, PSCOPE_OTHER);
747         RETURN(0);
748 }
749
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
751                                 long writing_bytes)
752 {
753         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
754
755         LASSERT(!(oa->o_valid & bits));
756
757         oa->o_valid |= bits;
758         client_obd_list_lock(&cli->cl_loi_list_lock);
759         oa->o_dirty = cli->cl_dirty;
760         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
763                 oa->o_undirty = 0;
764         } else if (cfs_atomic_read(&obd_dirty_pages) -
765                    cfs_atomic_read(&obd_dirty_transit_pages) >
766                    obd_max_dirty_pages + 1){
767                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768                  * not covered by a lock thus they may safely race and trip
769                  * this CERROR() unless we add in a small fudge factor (+1). */
770                 CERROR("dirty %d - %d > system dirty_max %d\n",
771                        cfs_atomic_read(&obd_dirty_pages),
772                        cfs_atomic_read(&obd_dirty_transit_pages),
773                        obd_max_dirty_pages);
774                 oa->o_undirty = 0;
775         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776                 CERROR("dirty %lu - dirty_max %lu too big???\n",
777                        cli->cl_dirty, cli->cl_dirty_max);
778                 oa->o_undirty = 0;
779         } else {
780                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781                                 (cli->cl_max_rpcs_in_flight + 1);
782                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
783         }
784         oa->o_grant = cli->cl_avail_grant;
785         oa->o_dropped = cli->cl_lost_grant;
786         cli->cl_lost_grant = 0;
787         client_obd_list_unlock(&cli->cl_loi_list_lock);
788         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
790
791 }
792
793 static void osc_update_next_shrink(struct client_obd *cli)
794 {
795         cli->cl_next_shrink_grant =
796                 cfs_time_shift(cli->cl_grant_shrink_interval);
797         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798                cli->cl_next_shrink_grant);
799 }
800
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803                                     struct brw_page *pga)
804 {
805         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807         cfs_atomic_inc(&obd_dirty_pages);
808         cli->cl_dirty += CFS_PAGE_SIZE;
809         cli->cl_avail_grant -= CFS_PAGE_SIZE;
810         pga->flag |= OBD_BRW_FROM_GRANT;
811         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812                CFS_PAGE_SIZE, pga, pga->pg);
813         LASSERT(cli->cl_avail_grant >= 0);
814         osc_update_next_shrink(cli);
815 }
816
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818  * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820                                     struct brw_page *pga, int sent)
821 {
822         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
823         ENTRY;
824
825         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
827                 EXIT;
828                 return;
829         }
830
831         pga->flag &= ~OBD_BRW_FROM_GRANT;
832         cfs_atomic_dec(&obd_dirty_pages);
833         cli->cl_dirty -= CFS_PAGE_SIZE;
834         if (pga->flag & OBD_BRW_NOCACHE) {
835                 pga->flag &= ~OBD_BRW_NOCACHE;
836                 cfs_atomic_dec(&obd_dirty_transit_pages);
837                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
838         }
839         if (!sent) {
840                 cli->cl_lost_grant += CFS_PAGE_SIZE;
841                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844                 /* For short writes we shouldn't count parts of pages that
845                  * span a whole block on the OST side, or our accounting goes
846                  * wrong.  Should match the code in filter_grant_check. */
847                 int offset = pga->off & ~CFS_PAGE_MASK;
848                 int count = pga->count + (offset & (blocksize - 1));
849                 int end = (offset + pga->count) & (blocksize - 1);
850                 if (end)
851                         count += blocksize - end;
852
853                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856                        cli->cl_avail_grant, cli->cl_dirty);
857         }
858
859         EXIT;
860 }
861
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
863 {
864         return cli->cl_r_in_flight + cli->cl_w_in_flight;
865 }
866
867 /* caller must hold loi_list_lock */
868 void osc_wake_cache_waiters(struct client_obd *cli)
869 {
870         cfs_list_t *l, *tmp;
871         struct osc_cache_waiter *ocw;
872
873         ENTRY;
874         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
875                 /* if we can't dirty more, we must wait until some is written */
876                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
877                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
878                     obd_max_dirty_pages)) {
879                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
880                                "osc max %ld, sys max %d\n", cli->cl_dirty,
881                                cli->cl_dirty_max, obd_max_dirty_pages);
882                         return;
883                 }
884
885                 /* if still dirty cache but no grant wait for pending RPCs that
886                  * may yet return us some grant before doing sync writes */
887                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
888                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
889                                cli->cl_w_in_flight);
890                         return;
891                 }
892
893                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
894                 cfs_list_del_init(&ocw->ocw_entry);
895                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
896                         /* no more RPCs in flight to return grant, do sync IO */
897                         ocw->ocw_rc = -EDQUOT;
898                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
899                 } else {
900                         osc_consume_write_grant(cli,
901                                                 &ocw->ocw_oap->oap_brw_page);
902                 }
903
904                 cfs_waitq_signal(&ocw->ocw_waitq);
905         }
906
907         EXIT;
908 }
909
910 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
911 {
912         client_obd_list_lock(&cli->cl_loi_list_lock);
913         cli->cl_avail_grant += grant;
914         client_obd_list_unlock(&cli->cl_loi_list_lock);
915 }
916
917 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
918 {
919         if (body->oa.o_valid & OBD_MD_FLGRANT) {
920                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
921                 __osc_update_grant(cli, body->oa.o_grant);
922         }
923 }
924
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926                               void *key, obd_count vallen, void *val,
927                               struct ptlrpc_request_set *set);
928
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930                                       struct ptlrpc_request *req,
931                                       void *aa, int rc)
932 {
933         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935         struct ost_body *body;
936
937         if (rc != 0) {
938                 __osc_update_grant(cli, oa->o_grant);
939                 GOTO(out, rc);
940         }
941
942         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
943         LASSERT(body);
944         osc_update_grant(cli, body);
945 out:
946         OBD_FREE_PTR(oa);
947         return rc;
948 }
949
950 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
951 {
952         client_obd_list_lock(&cli->cl_loi_list_lock);
953         oa->o_grant = cli->cl_avail_grant / 4;
954         cli->cl_avail_grant -= oa->o_grant;
955         client_obd_list_unlock(&cli->cl_loi_list_lock);
956         oa->o_flags |= OBD_FL_SHRINK_GRANT;
957         osc_update_next_shrink(cli);
958 }
959
960 /* Shrink the current grant, either from some large amount to enough for a
961  * full set of in-flight RPCs, or if we have already shrunk to that limit
962  * then to enough for a single RPC.  This avoids keeping more grant than
963  * needed, and avoids shrinking the grant piecemeal. */
964 static int osc_shrink_grant(struct client_obd *cli)
965 {
966         long target = (cli->cl_max_rpcs_in_flight + 1) *
967                       cli->cl_max_pages_per_rpc;
968
969         client_obd_list_lock(&cli->cl_loi_list_lock);
970         if (cli->cl_avail_grant <= target)
971                 target = cli->cl_max_pages_per_rpc;
972         client_obd_list_unlock(&cli->cl_loi_list_lock);
973
974         return osc_shrink_grant_to_target(cli, target);
975 }
976
977 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
978 {
979         int    rc = 0;
980         struct ost_body     *body;
981         ENTRY;
982
983         client_obd_list_lock(&cli->cl_loi_list_lock);
984         /* Don't shrink if we are already above or below the desired limit
985          * We don't want to shrink below a single RPC, as that will negatively
986          * impact block allocation and long-term performance. */
987         if (target < cli->cl_max_pages_per_rpc)
988                 target = cli->cl_max_pages_per_rpc;
989
990         if (target >= cli->cl_avail_grant) {
991                 client_obd_list_unlock(&cli->cl_loi_list_lock);
992                 RETURN(0);
993         }
994         client_obd_list_unlock(&cli->cl_loi_list_lock);
995
996         OBD_ALLOC_PTR(body);
997         if (!body)
998                 RETURN(-ENOMEM);
999
1000         osc_announce_cached(cli, &body->oa, 0);
1001
1002         client_obd_list_lock(&cli->cl_loi_list_lock);
1003         body->oa.o_grant = cli->cl_avail_grant - target;
1004         cli->cl_avail_grant = target;
1005         client_obd_list_unlock(&cli->cl_loi_list_lock);
1006         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1007         osc_update_next_shrink(cli);
1008
1009         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1010                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1011                                 sizeof(*body), body, NULL);
1012         if (rc != 0)
1013                 __osc_update_grant(cli, body->oa.o_grant);
1014         OBD_FREE_PTR(body);
1015         RETURN(rc);
1016 }
1017
1018 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1019 static int osc_should_shrink_grant(struct client_obd *client)
1020 {
1021         cfs_time_t time = cfs_time_current();
1022         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1023
1024         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1025              OBD_CONNECT_GRANT_SHRINK) == 0)
1026                 return 0;
1027
1028         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1029                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1030                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1031                         return 1;
1032                 else
1033                         osc_update_next_shrink(client);
1034         }
1035         return 0;
1036 }
1037
1038 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1039 {
1040         struct client_obd *client;
1041
1042         cfs_list_for_each_entry(client, &item->ti_obd_list,
1043                                 cl_grant_shrink_list) {
1044                 if (osc_should_shrink_grant(client))
1045                         osc_shrink_grant(client);
1046         }
1047         return 0;
1048 }
1049
1050 static int osc_add_shrink_grant(struct client_obd *client)
1051 {
1052         int rc;
1053
1054         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1055                                        TIMEOUT_GRANT,
1056                                        osc_grant_shrink_grant_cb, NULL,
1057                                        &client->cl_grant_shrink_list);
1058         if (rc) {
1059                 CERROR("add grant client %s error %d\n",
1060                         client->cl_import->imp_obd->obd_name, rc);
1061                 return rc;
1062         }
1063         CDEBUG(D_CACHE, "add grant client %s \n",
1064                client->cl_import->imp_obd->obd_name);
1065         osc_update_next_shrink(client);
1066         return 0;
1067 }
1068
1069 static int osc_del_shrink_grant(struct client_obd *client)
1070 {
1071         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1072                                          TIMEOUT_GRANT);
1073 }
1074
1075 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1076 {
1077         /*
1078          * ocd_grant is the total grant amount we're expect to hold: if we've
1079          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1080          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1081          *
1082          * race is tolerable here: if we're evicted, but imp_state already
1083          * left EVICTED state, then cl_dirty must be 0 already.
1084          */
1085         client_obd_list_lock(&cli->cl_loi_list_lock);
1086         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1087                 cli->cl_avail_grant = ocd->ocd_grant;
1088         else
1089                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1090         client_obd_list_unlock(&cli->cl_loi_list_lock);
1091
1092         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1093                cli->cl_avail_grant, cli->cl_lost_grant);
1094         LASSERT(cli->cl_avail_grant >= 0);
1095
1096         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1097             cfs_list_empty(&cli->cl_grant_shrink_list))
1098                 osc_add_shrink_grant(cli);
1099 }
1100
1101 /* We assume that the reason this OSC got a short read is because it read
1102  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1103  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1104  * this stripe never got written at or beyond this stripe offset yet. */
1105 static void handle_short_read(int nob_read, obd_count page_count,
1106                               struct brw_page **pga)
1107 {
1108         char *ptr;
1109         int i = 0;
1110
1111         /* skip bytes read OK */
1112         while (nob_read > 0) {
1113                 LASSERT (page_count > 0);
1114
1115                 if (pga[i]->count > nob_read) {
1116                         /* EOF inside this page */
1117                         ptr = cfs_kmap(pga[i]->pg) +
1118                                 (pga[i]->off & ~CFS_PAGE_MASK);
1119                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1120                         cfs_kunmap(pga[i]->pg);
1121                         page_count--;
1122                         i++;
1123                         break;
1124                 }
1125
1126                 nob_read -= pga[i]->count;
1127                 page_count--;
1128                 i++;
1129         }
1130
1131         /* zero remaining pages */
1132         while (page_count-- > 0) {
1133                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1134                 memset(ptr, 0, pga[i]->count);
1135                 cfs_kunmap(pga[i]->pg);
1136                 i++;
1137         }
1138 }
1139
1140 static int check_write_rcs(struct ptlrpc_request *req,
1141                            int requested_nob, int niocount,
1142                            obd_count page_count, struct brw_page **pga)
1143 {
1144         int     i;
1145         __u32   *remote_rcs;
1146
1147         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1148                                                   sizeof(*remote_rcs) *
1149                                                   niocount);
1150         if (remote_rcs == NULL) {
1151                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1152                 return(-EPROTO);
1153         }
1154
1155         /* return error if any niobuf was in error */
1156         for (i = 0; i < niocount; i++) {
1157                 if (remote_rcs[i] < 0)
1158                         return(remote_rcs[i]);
1159
1160                 if (remote_rcs[i] != 0) {
1161                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1162                                 i, remote_rcs[i], req);
1163                         return(-EPROTO);
1164                 }
1165         }
1166
1167         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1168                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1169                        req->rq_bulk->bd_nob_transferred, requested_nob);
1170                 return(-EPROTO);
1171         }
1172
1173         return (0);
1174 }
1175
1176 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1177 {
1178         if (p1->flag != p2->flag) {
1179                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1180                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1181
1182                 /* warn if we try to combine flags that we don't know to be
1183                  * safe to combine */
1184                 if ((p1->flag & mask) != (p2->flag & mask))
1185                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1186                                "same brw?\n", p1->flag, p2->flag);
1187                 return 0;
1188         }
1189
1190         return (p1->off + p1->count == p2->off);
1191 }
1192
1193 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1194                                    struct brw_page **pga, int opc,
1195                                    cksum_type_t cksum_type)
1196 {
1197         __u32 cksum;
1198         int i = 0;
1199
1200         LASSERT (pg_count > 0);
1201         cksum = init_checksum(cksum_type);
1202         while (nob > 0 && pg_count > 0) {
1203                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1204                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1205                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1206
1207                 /* corrupt the data before we compute the checksum, to
1208                  * simulate an OST->client data error */
1209                 if (i == 0 && opc == OST_READ &&
1210                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1211                         memcpy(ptr + off, "bad1", min(4, nob));
1212                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1213                 cfs_kunmap(pga[i]->pg);
1214                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1215                                off, cksum);
1216
1217                 nob -= pga[i]->count;
1218                 pg_count--;
1219                 i++;
1220         }
1221         /* For sending we only compute the wrong checksum instead
1222          * of corrupting the data so it is still correct on a redo */
1223         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1224                 cksum++;
1225
1226         return cksum;
1227 }
1228
1229 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1230                                 struct lov_stripe_md *lsm, obd_count page_count,
1231                                 struct brw_page **pga,
1232                                 struct ptlrpc_request **reqp,
1233                                 struct obd_capa *ocapa, int reserve)
1234 {
1235         struct ptlrpc_request   *req;
1236         struct ptlrpc_bulk_desc *desc;
1237         struct ost_body         *body;
1238         struct obd_ioobj        *ioobj;
1239         struct niobuf_remote    *niobuf;
1240         int niocount, i, requested_nob, opc, rc;
1241         struct osc_brw_async_args *aa;
1242         struct req_capsule      *pill;
1243         struct brw_page *pg_prev;
1244
1245         ENTRY;
1246         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1247                 RETURN(-ENOMEM); /* Recoverable */
1248         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1249                 RETURN(-EINVAL); /* Fatal */
1250
1251         if ((cmd & OBD_BRW_WRITE) != 0) {
1252                 opc = OST_WRITE;
1253                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1254                                                 cli->cl_import->imp_rq_pool,
1255                                                 &RQF_OST_BRW_WRITE);
1256         } else {
1257                 opc = OST_READ;
1258                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1259         }
1260         if (req == NULL)
1261                 RETURN(-ENOMEM);
1262
1263         for (niocount = i = 1; i < page_count; i++) {
1264                 if (!can_merge_pages(pga[i - 1], pga[i]))
1265                         niocount++;
1266         }
1267
1268         pill = &req->rq_pill;
1269         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1270                              sizeof(*ioobj));
1271         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1272                              niocount * sizeof(*niobuf));
1273         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1274
1275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1276         if (rc) {
1277                 ptlrpc_request_free(req);
1278                 RETURN(rc);
1279         }
1280         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1281         ptlrpc_at_set_req_timeout(req);
1282
1283         if (opc == OST_WRITE)
1284                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1285                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1286         else
1287                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1288                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1289
1290         if (desc == NULL)
1291                 GOTO(out, rc = -ENOMEM);
1292         /* NB request now owns desc and will free it when it gets freed */
1293
1294         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1295         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1296         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1297         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1298
1299         lustre_set_wire_obdo(&body->oa, oa);
1300
1301         obdo_to_ioobj(oa, ioobj);
1302         ioobj->ioo_bufcnt = niocount;
1303         osc_pack_capa(req, body, ocapa);
1304         LASSERT (page_count > 0);
1305         pg_prev = pga[0];
1306         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1307                 struct brw_page *pg = pga[i];
1308
1309                 LASSERT(pg->count > 0);
1310                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1311                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1312                          pg->off, pg->count);
1313 #ifdef __linux__
1314                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1315                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1316                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1317                          i, page_count,
1318                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1319                          pg_prev->pg, page_private(pg_prev->pg),
1320                          pg_prev->pg->index, pg_prev->off);
1321 #else
1322                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1323                          "i %d p_c %u\n", i, page_count);
1324 #endif
1325                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1326                         (pg->flag & OBD_BRW_SRVLOCK));
1327
1328                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1329                                       pg->count);
1330                 requested_nob += pg->count;
1331
1332                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1333                         niobuf--;
1334                         niobuf->len += pg->count;
1335                 } else {
1336                         niobuf->offset = pg->off;
1337                         niobuf->len    = pg->count;
1338                         niobuf->flags  = pg->flag;
1339                 }
1340                 pg_prev = pg;
1341         }
1342
1343         LASSERTF((void *)(niobuf - niocount) ==
1344                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1345                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1346                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1347
1348         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1349         if (osc_should_shrink_grant(cli))
1350                 osc_shrink_grant_local(cli, &body->oa);
1351
1352         /* size[REQ_REC_OFF] still sizeof (*body) */
1353         if (opc == OST_WRITE) {
1354                 if (unlikely(cli->cl_checksum) &&
1355                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1356                         /* store cl_cksum_type in a local variable since
1357                          * it can be changed via lprocfs */
1358                         cksum_type_t cksum_type = cli->cl_cksum_type;
1359
1360                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1361                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1362                                 body->oa.o_flags = 0;
1363                         }
1364                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1365                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1366                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1367                                                              page_count, pga,
1368                                                              OST_WRITE,
1369                                                              cksum_type);
1370                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1371                                body->oa.o_cksum);
1372                         /* save this in 'oa', too, for later checking */
1373                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374                         oa->o_flags |= cksum_type_pack(cksum_type);
1375                 } else {
1376                         /* clear out the checksum flag, in case this is a
1377                          * resend but cl_checksum is no longer set. b=11238 */
1378                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1379                 }
1380                 oa->o_cksum = body->oa.o_cksum;
1381                 /* 1 RC per niobuf */
1382                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1383                                      sizeof(__u32) * niocount);
1384         } else {
1385                 if (unlikely(cli->cl_checksum) &&
1386                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1387                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1388                                 body->oa.o_flags = 0;
1389                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1390                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1391                 }
1392         }
1393         ptlrpc_request_set_replen(req);
1394
1395         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1396         aa = ptlrpc_req_async_args(req);
1397         aa->aa_oa = oa;
1398         aa->aa_requested_nob = requested_nob;
1399         aa->aa_nio_count = niocount;
1400         aa->aa_page_count = page_count;
1401         aa->aa_resends = 0;
1402         aa->aa_ppga = pga;
1403         aa->aa_cli = cli;
1404         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1405         if (ocapa && reserve)
1406                 aa->aa_ocapa = capa_get(ocapa);
1407
1408         *reqp = req;
1409         RETURN(0);
1410
1411  out:
1412         ptlrpc_req_finished(req);
1413         RETURN(rc);
1414 }
1415
1416 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1417                                 __u32 client_cksum, __u32 server_cksum, int nob,
1418                                 obd_count page_count, struct brw_page **pga,
1419                                 cksum_type_t client_cksum_type)
1420 {
1421         __u32 new_cksum;
1422         char *msg;
1423         cksum_type_t cksum_type;
1424
1425         if (server_cksum == client_cksum) {
1426                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1427                 return 0;
1428         }
1429
1430         if (oa->o_valid & OBD_MD_FLFLAGS)
1431                 cksum_type = cksum_type_unpack(oa->o_flags);
1432         else
1433                 cksum_type = OBD_CKSUM_CRC32;
1434
1435         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1436                                       cksum_type);
1437
1438         if (cksum_type != client_cksum_type)
1439                 msg = "the server did not use the checksum type specified in "
1440                       "the original request - likely a protocol problem";
1441         else if (new_cksum == server_cksum)
1442                 msg = "changed on the client after we checksummed it - "
1443                       "likely false positive due to mmap IO (bug 11742)";
1444         else if (new_cksum == client_cksum)
1445                 msg = "changed in transit before arrival at OST";
1446         else
1447                 msg = "changed in transit AND doesn't match the original - "
1448                       "likely false positive due to mmap IO (bug 11742)";
1449
1450         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1451                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1452                            msg, libcfs_nid2str(peer->nid),
1453                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1454                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1455                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1456                            oa->o_id,
1457                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1458                            pga[0]->off,
1459                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1460         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1461                "client csum now %x\n", client_cksum, client_cksum_type,
1462                server_cksum, cksum_type, new_cksum);
1463         return 1;
1464 }
1465
1466 /* Note rc enters this function as number of bytes transferred */
1467 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1468 {
1469         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1470         const lnet_process_id_t *peer =
1471                         &req->rq_import->imp_connection->c_peer;
1472         struct client_obd *cli = aa->aa_cli;
1473         struct ost_body *body;
1474         __u32 client_cksum = 0;
1475         ENTRY;
1476
1477         if (rc < 0 && rc != -EDQUOT) {
1478                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1479                 RETURN(rc);
1480         }
1481
1482         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1483         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1484         if (body == NULL) {
1485                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1486                 RETURN(-EPROTO);
1487         }
1488
1489 #ifdef HAVE_QUOTA_SUPPORT
1490         /* set/clear over quota flag for a uid/gid */
1491         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1492             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1493                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1494
1495                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1496                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1497                        body->oa.o_flags);
1498                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1499                              body->oa.o_flags);
1500         }
1501 #endif
1502
1503         osc_update_grant(cli, body);
1504
1505         if (rc < 0)
1506                 RETURN(rc);
1507
1508         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1509                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1510
1511         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1512                 if (rc > 0) {
1513                         CERROR("Unexpected +ve rc %d\n", rc);
1514                         RETURN(-EPROTO);
1515                 }
1516                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1517
1518                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1519                         RETURN(-EAGAIN);
1520
1521                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1522                     check_write_checksum(&body->oa, peer, client_cksum,
1523                                          body->oa.o_cksum, aa->aa_requested_nob,
1524                                          aa->aa_page_count, aa->aa_ppga,
1525                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1526                         RETURN(-EAGAIN);
1527
1528                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1529                                      aa->aa_page_count, aa->aa_ppga);
1530                 GOTO(out, rc);
1531         }
1532
1533         /* The rest of this function executes only for OST_READs */
1534
1535         /* if unwrap_bulk failed, return -EAGAIN to retry */
1536         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1537         if (rc < 0)
1538                 GOTO(out, rc = -EAGAIN);
1539
1540         if (rc > aa->aa_requested_nob) {
1541                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1542                        aa->aa_requested_nob);
1543                 RETURN(-EPROTO);
1544         }
1545
1546         if (rc != req->rq_bulk->bd_nob_transferred) {
1547                 CERROR ("Unexpected rc %d (%d transferred)\n",
1548                         rc, req->rq_bulk->bd_nob_transferred);
1549                 return (-EPROTO);
1550         }
1551
1552         if (rc < aa->aa_requested_nob)
1553                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1554
1555         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1556                 static int cksum_counter;
1557                 __u32      server_cksum = body->oa.o_cksum;
1558                 char      *via;
1559                 char      *router;
1560                 cksum_type_t cksum_type;
1561
1562                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1563                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1564                 else
1565                         cksum_type = OBD_CKSUM_CRC32;
1566                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1567                                                  aa->aa_ppga, OST_READ,
1568                                                  cksum_type);
1569
1570                 if (peer->nid == req->rq_bulk->bd_sender) {
1571                         via = router = "";
1572                 } else {
1573                         via = " via ";
1574                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1575                 }
1576
1577                 if (server_cksum == ~0 && rc > 0) {
1578                         CERROR("Protocol error: server %s set the 'checksum' "
1579                                "bit, but didn't send a checksum.  Not fatal, "
1580                                "but please notify on http://bugzilla.lustre.org/\n",
1581                                libcfs_nid2str(peer->nid));
1582                 } else if (server_cksum != client_cksum) {
1583                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1584                                            "%s%s%s inode "DFID" object "
1585                                            LPU64"/"LPU64" extent "
1586                                            "["LPU64"-"LPU64"]\n",
1587                                            req->rq_import->imp_obd->obd_name,
1588                                            libcfs_nid2str(peer->nid),
1589                                            via, router,
1590                                            body->oa.o_valid & OBD_MD_FLFID ?
1591                                                 body->oa.o_parent_seq : (__u64)0,
1592                                            body->oa.o_valid & OBD_MD_FLFID ?
1593                                                 body->oa.o_parent_oid : 0,
1594                                            body->oa.o_valid & OBD_MD_FLFID ?
1595                                                 body->oa.o_parent_ver : 0,
1596                                            body->oa.o_id,
1597                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1598                                                 body->oa.o_seq : (__u64)0,
1599                                            aa->aa_ppga[0]->off,
1600                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1601                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1602                                                                         1);
1603                         CERROR("client %x, server %x, cksum_type %x\n",
1604                                client_cksum, server_cksum, cksum_type);
1605                         cksum_counter = 0;
1606                         aa->aa_oa->o_cksum = client_cksum;
1607                         rc = -EAGAIN;
1608                 } else {
1609                         cksum_counter++;
1610                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1611                         rc = 0;
1612                 }
1613         } else if (unlikely(client_cksum)) {
1614                 static int cksum_missed;
1615
1616                 cksum_missed++;
1617                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1618                         CERROR("Checksum %u requested from %s but not sent\n",
1619                                cksum_missed, libcfs_nid2str(peer->nid));
1620         } else {
1621                 rc = 0;
1622         }
1623 out:
1624         if (rc >= 0)
1625                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1626
1627         RETURN(rc);
1628 }
1629
1630 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1631                             struct lov_stripe_md *lsm,
1632                             obd_count page_count, struct brw_page **pga,
1633                             struct obd_capa *ocapa)
1634 {
1635         struct ptlrpc_request *req;
1636         int                    rc;
1637         cfs_waitq_t            waitq;
1638         int                    resends = 0;
1639         struct l_wait_info     lwi;
1640
1641         ENTRY;
1642
1643         cfs_waitq_init(&waitq);
1644
1645 restart_bulk:
1646         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1647                                   page_count, pga, &req, ocapa, 0);
1648         if (rc != 0)
1649                 return (rc);
1650
1651         rc = ptlrpc_queue_wait(req);
1652
1653         if (rc == -ETIMEDOUT && req->rq_resend) {
1654                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1655                 ptlrpc_req_finished(req);
1656                 goto restart_bulk;
1657         }
1658
1659         rc = osc_brw_fini_request(req, rc);
1660
1661         ptlrpc_req_finished(req);
1662         if (osc_recoverable_error(rc)) {
1663                 resends++;
1664                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1665                         CERROR("too many resend retries, returning error\n");
1666                         RETURN(-EIO);
1667                 }
1668
1669                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1670                 l_wait_event(waitq, 0, &lwi);
1671
1672                 goto restart_bulk;
1673         }
1674
1675         RETURN (rc);
1676 }
1677
1678 int osc_brw_redo_request(struct ptlrpc_request *request,
1679                          struct osc_brw_async_args *aa)
1680 {
1681         struct ptlrpc_request *new_req;
1682         struct ptlrpc_request_set *set = request->rq_set;
1683         struct osc_brw_async_args *new_aa;
1684         struct osc_async_page *oap;
1685         int rc = 0;
1686         ENTRY;
1687
1688         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1689                 CERROR("too many resent retries, returning error\n");
1690                 RETURN(-EIO);
1691         }
1692
1693         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1694
1695         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1696                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1697                                   aa->aa_cli, aa->aa_oa,
1698                                   NULL /* lsm unused by osc currently */,
1699                                   aa->aa_page_count, aa->aa_ppga,
1700                                   &new_req, aa->aa_ocapa, 0);
1701         if (rc)
1702                 RETURN(rc);
1703
1704         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1705
1706         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1707                 if (oap->oap_request != NULL) {
1708                         LASSERTF(request == oap->oap_request,
1709                                  "request %p != oap_request %p\n",
1710                                  request, oap->oap_request);
1711                         if (oap->oap_interrupted) {
1712                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1713                                 ptlrpc_req_finished(new_req);
1714                                 RETURN(-EINTR);
1715                         }
1716                 }
1717         }
1718         /* New request takes over pga and oaps from old request.
1719          * Note that copying a list_head doesn't work, need to move it... */
1720         aa->aa_resends++;
1721         new_req->rq_interpret_reply = request->rq_interpret_reply;
1722         new_req->rq_async_args = request->rq_async_args;
1723         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1724
1725         new_aa = ptlrpc_req_async_args(new_req);
1726
1727         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1728         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1729         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1730
1731         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1732                 if (oap->oap_request) {
1733                         ptlrpc_req_finished(oap->oap_request);
1734                         oap->oap_request = ptlrpc_request_addref(new_req);
1735                 }
1736         }
1737
1738         new_aa->aa_ocapa = aa->aa_ocapa;
1739         aa->aa_ocapa = NULL;
1740
1741         /* use ptlrpc_set_add_req is safe because interpret functions work
1742          * in check_set context. only one way exist with access to request
1743          * from different thread got -EINTR - this way protected with
1744          * cl_loi_list_lock */
1745         ptlrpc_set_add_req(set, new_req);
1746
1747         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1748
1749         DEBUG_REQ(D_INFO, new_req, "new request");
1750         RETURN(0);
1751 }
1752
1753 /*
1754  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1755  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1756  * fine for our small page arrays and doesn't require allocation.  its an
1757  * insertion sort that swaps elements that are strides apart, shrinking the
1758  * stride down until its '1' and the array is sorted.
1759  */
1760 static void sort_brw_pages(struct brw_page **array, int num)
1761 {
1762         int stride, i, j;
1763         struct brw_page *tmp;
1764
1765         if (num == 1)
1766                 return;
1767         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1768                 ;
1769
1770         do {
1771                 stride /= 3;
1772                 for (i = stride ; i < num ; i++) {
1773                         tmp = array[i];
1774                         j = i;
1775                         while (j >= stride && array[j - stride]->off > tmp->off) {
1776                                 array[j] = array[j - stride];
1777                                 j -= stride;
1778                         }
1779                         array[j] = tmp;
1780                 }
1781         } while (stride > 1);
1782 }
1783
1784 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1785 {
1786         int count = 1;
1787         int offset;
1788         int i = 0;
1789
1790         LASSERT (pages > 0);
1791         offset = pg[i]->off & ~CFS_PAGE_MASK;
1792
1793         for (;;) {
1794                 pages--;
1795                 if (pages == 0)         /* that's all */
1796                         return count;
1797
1798                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1799                         return count;   /* doesn't end on page boundary */
1800
1801                 i++;
1802                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1803                 if (offset != 0)        /* doesn't start on page boundary */
1804                         return count;
1805
1806                 count++;
1807         }
1808 }
1809
1810 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1811 {
1812         struct brw_page **ppga;
1813         int i;
1814
1815         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1816         if (ppga == NULL)
1817                 return NULL;
1818
1819         for (i = 0; i < count; i++)
1820                 ppga[i] = pga + i;
1821         return ppga;
1822 }
1823
1824 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1825 {
1826         LASSERT(ppga != NULL);
1827         OBD_FREE(ppga, sizeof(*ppga) * count);
1828 }
1829
1830 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1831                    obd_count page_count, struct brw_page *pga,
1832                    struct obd_trans_info *oti)
1833 {
1834         struct obdo *saved_oa = NULL;
1835         struct brw_page **ppga, **orig;
1836         struct obd_import *imp = class_exp2cliimp(exp);
1837         struct client_obd *cli;
1838         int rc, page_count_orig;
1839         ENTRY;
1840
1841         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1842         cli = &imp->imp_obd->u.cli;
1843
1844         if (cmd & OBD_BRW_CHECK) {
1845                 /* The caller just wants to know if there's a chance that this
1846                  * I/O can succeed */
1847
1848                 if (imp->imp_invalid)
1849                         RETURN(-EIO);
1850                 RETURN(0);
1851         }
1852
1853         /* test_brw with a failed create can trip this, maybe others. */
1854         LASSERT(cli->cl_max_pages_per_rpc);
1855
1856         rc = 0;
1857
1858         orig = ppga = osc_build_ppga(pga, page_count);
1859         if (ppga == NULL)
1860                 RETURN(-ENOMEM);
1861         page_count_orig = page_count;
1862
1863         sort_brw_pages(ppga, page_count);
1864         while (page_count) {
1865                 obd_count pages_per_brw;
1866
1867                 if (page_count > cli->cl_max_pages_per_rpc)
1868                         pages_per_brw = cli->cl_max_pages_per_rpc;
1869                 else
1870                         pages_per_brw = page_count;
1871
1872                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1873
1874                 if (saved_oa != NULL) {
1875                         /* restore previously saved oa */
1876                         *oinfo->oi_oa = *saved_oa;
1877                 } else if (page_count > pages_per_brw) {
1878                         /* save a copy of oa (brw will clobber it) */
1879                         OBDO_ALLOC(saved_oa);
1880                         if (saved_oa == NULL)
1881                                 GOTO(out, rc = -ENOMEM);
1882                         *saved_oa = *oinfo->oi_oa;
1883                 }
1884
1885                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1886                                       pages_per_brw, ppga, oinfo->oi_capa);
1887
1888                 if (rc != 0)
1889                         break;
1890
1891                 page_count -= pages_per_brw;
1892                 ppga += pages_per_brw;
1893         }
1894
1895 out:
1896         osc_release_ppga(orig, page_count_orig);
1897
1898         if (saved_oa != NULL)
1899                 OBDO_FREE(saved_oa);
1900
1901         RETURN(rc);
1902 }
1903
1904 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1905  * the dirty accounting.  Writeback completes or truncate happens before
1906  * writing starts.  Must be called with the loi lock held. */
1907 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1908                            int sent)
1909 {
1910         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1911 }
1912
1913
1914 /* This maintains the lists of pending pages to read/write for a given object
1915  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1916  * to quickly find objects that are ready to send an RPC. */
1917 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1918                          int cmd)
1919 {
1920         int optimal;
1921         ENTRY;
1922
1923         if (lop->lop_num_pending == 0)
1924                 RETURN(0);
1925
1926         /* if we have an invalid import we want to drain the queued pages
1927          * by forcing them through rpcs that immediately fail and complete
1928          * the pages.  recovery relies on this to empty the queued pages
1929          * before canceling the locks and evicting down the llite pages */
1930         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1931                 RETURN(1);
1932
1933         /* stream rpcs in queue order as long as as there is an urgent page
1934          * queued.  this is our cheap solution for good batching in the case
1935          * where writepage marks some random page in the middle of the file
1936          * as urgent because of, say, memory pressure */
1937         if (!cfs_list_empty(&lop->lop_urgent)) {
1938                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1939                 RETURN(1);
1940         }
1941         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1942         optimal = cli->cl_max_pages_per_rpc;
1943         if (cmd & OBD_BRW_WRITE) {
1944                 /* trigger a write rpc stream as long as there are dirtiers
1945                  * waiting for space.  as they're waiting, they're not going to
1946                  * create more pages to coalesce with what's waiting.. */
1947                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1948                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1949                         RETURN(1);
1950                 }
1951                 /* +16 to avoid triggering rpcs that would want to include pages
1952                  * that are being queued but which can't be made ready until
1953                  * the queuer finishes with the page. this is a wart for
1954                  * llite::commit_write() */
1955                 optimal += 16;
1956         }
1957         if (lop->lop_num_pending >= optimal)
1958                 RETURN(1);
1959
1960         RETURN(0);
1961 }
1962
1963 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1964 {
1965         struct osc_async_page *oap;
1966         ENTRY;
1967
1968         if (cfs_list_empty(&lop->lop_urgent))
1969                 RETURN(0);
1970
1971         oap = cfs_list_entry(lop->lop_urgent.next,
1972                          struct osc_async_page, oap_urgent_item);
1973
1974         if (oap->oap_async_flags & ASYNC_HP) {
1975                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1976                 RETURN(1);
1977         }
1978
1979         RETURN(0);
1980 }
1981
1982 static void on_list(cfs_list_t *item, cfs_list_t *list,
1983                     int should_be_on)
1984 {
1985         if (cfs_list_empty(item) && should_be_on)
1986                 cfs_list_add_tail(item, list);
1987         else if (!cfs_list_empty(item) && !should_be_on)
1988                 cfs_list_del_init(item);
1989 }
1990
1991 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1992  * can find pages to build into rpcs quickly */
1993 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1994 {
1995         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1996             lop_makes_hprpc(&loi->loi_read_lop)) {
1997                 /* HP rpc */
1998                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1999                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2000         } else {
2001                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2002                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2003                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2004                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2005         }
2006
2007         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2008                 loi->loi_write_lop.lop_num_pending);
2009
2010         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2011                 loi->loi_read_lop.lop_num_pending);
2012 }
2013
2014 static void lop_update_pending(struct client_obd *cli,
2015                                struct loi_oap_pages *lop, int cmd, int delta)
2016 {
2017         lop->lop_num_pending += delta;
2018         if (cmd & OBD_BRW_WRITE)
2019                 cli->cl_pending_w_pages += delta;
2020         else
2021                 cli->cl_pending_r_pages += delta;
2022 }
2023
2024 /**
2025  * this is called when a sync waiter receives an interruption.  Its job is to
2026  * get the caller woken as soon as possible.  If its page hasn't been put in an
2027  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2028  * desiring interruption which will forcefully complete the rpc once the rpc
2029  * has timed out.
2030  */
2031 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2032 {
2033         struct loi_oap_pages *lop;
2034         struct lov_oinfo *loi;
2035         int rc = -EBUSY;
2036         ENTRY;
2037
2038         LASSERT(!oap->oap_interrupted);
2039         oap->oap_interrupted = 1;
2040
2041         /* ok, it's been put in an rpc. only one oap gets a request reference */
2042         if (oap->oap_request != NULL) {
2043                 ptlrpc_mark_interrupted(oap->oap_request);
2044                 ptlrpcd_wake(oap->oap_request);
2045                 ptlrpc_req_finished(oap->oap_request);
2046                 oap->oap_request = NULL;
2047         }
2048
2049         /*
2050          * page completion may be called only if ->cpo_prep() method was
2051          * executed by osc_io_submit(), that also adds page the to pending list
2052          */
2053         if (!cfs_list_empty(&oap->oap_pending_item)) {
2054                 cfs_list_del_init(&oap->oap_pending_item);
2055                 cfs_list_del_init(&oap->oap_urgent_item);
2056
2057                 loi = oap->oap_loi;
2058                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2059                         &loi->loi_write_lop : &loi->loi_read_lop;
2060                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2061                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2062                 rc = oap->oap_caller_ops->ap_completion(env,
2063                                           oap->oap_caller_data,
2064                                           oap->oap_cmd, NULL, -EINTR);
2065         }
2066
2067         RETURN(rc);
2068 }
2069
2070 /* this is trying to propogate async writeback errors back up to the
2071  * application.  As an async write fails we record the error code for later if
2072  * the app does an fsync.  As long as errors persist we force future rpcs to be
2073  * sync so that the app can get a sync error and break the cycle of queueing
2074  * pages for which writeback will fail. */
2075 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2076                            int rc)
2077 {
2078         if (rc) {
2079                 if (!ar->ar_rc)
2080                         ar->ar_rc = rc;
2081
2082                 ar->ar_force_sync = 1;
2083                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2084                 return;
2085
2086         }
2087
2088         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2089                 ar->ar_force_sync = 0;
2090 }
2091
2092 void osc_oap_to_pending(struct osc_async_page *oap)
2093 {
2094         struct loi_oap_pages *lop;
2095
2096         if (oap->oap_cmd & OBD_BRW_WRITE)
2097                 lop = &oap->oap_loi->loi_write_lop;
2098         else
2099                 lop = &oap->oap_loi->loi_read_lop;
2100
2101         if (oap->oap_async_flags & ASYNC_HP)
2102                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2103         else if (oap->oap_async_flags & ASYNC_URGENT)
2104                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2105         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2106         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2107 }
2108
2109 /* this must be called holding the loi list lock to give coverage to exit_cache,
2110  * async_flag maintenance, and oap_request */
2111 static void osc_ap_completion(const struct lu_env *env,
2112                               struct client_obd *cli, struct obdo *oa,
2113                               struct osc_async_page *oap, int sent, int rc)
2114 {
2115         __u64 xid = 0;
2116
2117         ENTRY;
2118         if (oap->oap_request != NULL) {
2119                 xid = ptlrpc_req_xid(oap->oap_request);
2120                 ptlrpc_req_finished(oap->oap_request);
2121                 oap->oap_request = NULL;
2122         }
2123
2124         cfs_spin_lock(&oap->oap_lock);
2125         oap->oap_async_flags = 0;
2126         cfs_spin_unlock(&oap->oap_lock);
2127         oap->oap_interrupted = 0;
2128
2129         if (oap->oap_cmd & OBD_BRW_WRITE) {
2130                 osc_process_ar(&cli->cl_ar, xid, rc);
2131                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2132         }
2133
2134         if (rc == 0 && oa != NULL) {
2135                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2136                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2137                 if (oa->o_valid & OBD_MD_FLMTIME)
2138                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2139                 if (oa->o_valid & OBD_MD_FLATIME)
2140                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2141                 if (oa->o_valid & OBD_MD_FLCTIME)
2142                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2143         }
2144
2145         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2146                                                 oap->oap_cmd, oa, rc);
2147
2148         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2149          * I/O on the page could start, but OSC calls it under lock
2150          * and thus we can add oap back to pending safely */
2151         if (rc)
2152                 /* upper layer wants to leave the page on pending queue */
2153                 osc_oap_to_pending(oap);
2154         else
2155                 osc_exit_cache(cli, oap, sent);
2156         EXIT;
2157 }
2158
2159 static int brw_interpret(const struct lu_env *env,
2160                          struct ptlrpc_request *req, void *data, int rc)
2161 {
2162         struct osc_brw_async_args *aa = data;
2163         struct client_obd *cli;
2164         int async;
2165         ENTRY;
2166
2167         rc = osc_brw_fini_request(req, rc);
2168         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2169         if (osc_recoverable_error(rc)) {
2170                 rc = osc_brw_redo_request(req, aa);
2171                 if (rc == 0)
2172                         RETURN(0);
2173         }
2174
2175         if (aa->aa_ocapa) {
2176                 capa_put(aa->aa_ocapa);
2177                 aa->aa_ocapa = NULL;
2178         }
2179
2180         cli = aa->aa_cli;
2181
2182         client_obd_list_lock(&cli->cl_loi_list_lock);
2183
2184         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2185          * is called so we know whether to go to sync BRWs or wait for more
2186          * RPCs to complete */
2187         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2188                 cli->cl_w_in_flight--;
2189         else
2190                 cli->cl_r_in_flight--;
2191
2192         async = cfs_list_empty(&aa->aa_oaps);
2193         if (!async) { /* from osc_send_oap_rpc() */
2194                 struct osc_async_page *oap, *tmp;
2195                 /* the caller may re-use the oap after the completion call so
2196                  * we need to clean it up a little */
2197                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2198                                              oap_rpc_item) {
2199                         cfs_list_del_init(&oap->oap_rpc_item);
2200                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2201                 }
2202                 OBDO_FREE(aa->aa_oa);
2203         } else { /* from async_internal() */
2204                 int i;
2205                 for (i = 0; i < aa->aa_page_count; i++)
2206                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2207
2208                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2209                         OBDO_FREE(aa->aa_oa);
2210         }
2211         osc_wake_cache_waiters(cli);
2212         osc_check_rpcs(env, cli);
2213         client_obd_list_unlock(&cli->cl_loi_list_lock);
2214         if (!async)
2215                 cl_req_completion(env, aa->aa_clerq, rc);
2216         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2217         RETURN(rc);
2218 }
2219
2220 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2221                                             struct client_obd *cli,
2222                                             cfs_list_t *rpc_list,
2223                                             int page_count, int cmd)
2224 {
2225         struct ptlrpc_request *req;
2226         struct brw_page **pga = NULL;
2227         struct osc_brw_async_args *aa;
2228         struct obdo *oa = NULL;
2229         const struct obd_async_page_ops *ops = NULL;
2230         void *caller_data = NULL;
2231         struct osc_async_page *oap;
2232         struct osc_async_page *tmp;
2233         struct ost_body *body;
2234         struct cl_req *clerq = NULL;
2235         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2236         struct ldlm_lock *lock = NULL;
2237         struct cl_req_attr crattr;
2238         int i, rc, mpflag = 0;
2239
2240         ENTRY;
2241         LASSERT(!cfs_list_empty(rpc_list));
2242
2243         if (cmd & OBD_BRW_MEMALLOC)
2244                 mpflag = cfs_memory_pressure_get_and_set();
2245
2246         memset(&crattr, 0, sizeof crattr);
2247         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2248         if (pga == NULL)
2249                 GOTO(out, req = ERR_PTR(-ENOMEM));
2250
2251         OBDO_ALLOC(oa);
2252         if (oa == NULL)
2253                 GOTO(out, req = ERR_PTR(-ENOMEM));
2254
2255         i = 0;
2256         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2257                 struct cl_page *page = osc_oap2cl_page(oap);
2258                 if (ops == NULL) {
2259                         ops = oap->oap_caller_ops;
2260                         caller_data = oap->oap_caller_data;
2261
2262                         clerq = cl_req_alloc(env, page, crt,
2263                                              1 /* only 1-object rpcs for
2264                                                 * now */);
2265                         if (IS_ERR(clerq))
2266                                 GOTO(out, req = (void *)clerq);
2267                         lock = oap->oap_ldlm_lock;
2268                 }
2269                 pga[i] = &oap->oap_brw_page;
2270                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2271                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2272                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2273                 i++;
2274                 cl_req_page_add(env, clerq, page);
2275         }
2276
2277         /* always get the data for the obdo for the rpc */
2278         LASSERT(ops != NULL);
2279         crattr.cra_oa = oa;
2280         crattr.cra_capa = NULL;
2281         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2282         if (lock) {
2283                 oa->o_handle = lock->l_remote_handle;
2284                 oa->o_valid |= OBD_MD_FLHANDLE;
2285         }
2286
2287         rc = cl_req_prep(env, clerq);
2288         if (rc != 0) {
2289                 CERROR("cl_req_prep failed: %d\n", rc);
2290                 GOTO(out, req = ERR_PTR(rc));
2291         }
2292
2293         sort_brw_pages(pga, page_count);
2294         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2295                                   pga, &req, crattr.cra_capa, 1);
2296         if (rc != 0) {
2297                 CERROR("prep_req failed: %d\n", rc);
2298                 GOTO(out, req = ERR_PTR(rc));
2299         }
2300
2301         if (cmd & OBD_BRW_MEMALLOC)
2302                 req->rq_memalloc = 1;
2303
2304         /* Need to update the timestamps after the request is built in case
2305          * we race with setattr (locally or in queue at OST).  If OST gets
2306          * later setattr before earlier BRW (as determined by the request xid),
2307          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2308          * way to do this in a single call.  bug 10150 */
2309         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2310         cl_req_attr_set(env, clerq, &crattr,
2311                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2312
2313         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2314         aa = ptlrpc_req_async_args(req);
2315         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2316         cfs_list_splice(rpc_list, &aa->aa_oaps);
2317         CFS_INIT_LIST_HEAD(rpc_list);
2318         aa->aa_clerq = clerq;
2319 out:
2320         if (cmd & OBD_BRW_MEMALLOC)
2321                 cfs_memory_pressure_restore(mpflag);
2322
2323         capa_put(crattr.cra_capa);
2324         if (IS_ERR(req)) {
2325                 if (oa)
2326                         OBDO_FREE(oa);
2327                 if (pga)
2328                         OBD_FREE(pga, sizeof(*pga) * page_count);
2329                 /* this should happen rarely and is pretty bad, it makes the
2330                  * pending list not follow the dirty order */
2331                 client_obd_list_lock(&cli->cl_loi_list_lock);
2332                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2333                         cfs_list_del_init(&oap->oap_rpc_item);
2334
2335                         /* queued sync pages can be torn down while the pages
2336                          * were between the pending list and the rpc */
2337                         if (oap->oap_interrupted) {
2338                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2339                                 osc_ap_completion(env, cli, NULL, oap, 0,
2340                                                   oap->oap_count);
2341                                 continue;
2342                         }
2343                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2344                 }
2345                 if (clerq && !IS_ERR(clerq))
2346                         cl_req_completion(env, clerq, PTR_ERR(req));
2347         }
2348         RETURN(req);
2349 }
2350
2351 /**
2352  * prepare pages for ASYNC io and put pages in send queue.
2353  *
2354  * \param cmd OBD_BRW_* macroses
2355  * \param lop pending pages
2356  *
2357  * \return zero if no page added to send queue.
2358  * \return 1 if pages successfully added to send queue.
2359  * \return negative on errors.
2360  */
2361 static int
2362 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2363                  struct lov_oinfo *loi,
2364                  int cmd, struct loi_oap_pages *lop)
2365 {
2366         struct ptlrpc_request *req;
2367         obd_count page_count = 0;
2368         struct osc_async_page *oap = NULL, *tmp;
2369         struct osc_brw_async_args *aa;
2370         const struct obd_async_page_ops *ops;
2371         CFS_LIST_HEAD(rpc_list);
2372         CFS_LIST_HEAD(tmp_list);
2373         unsigned int ending_offset;
2374         unsigned  starting_offset = 0;
2375         int srvlock = 0, mem_tight = 0;
2376         struct cl_object *clob = NULL;
2377         ENTRY;
2378
2379         /* ASYNC_HP pages first. At present, when the lock the pages is
2380          * to be canceled, the pages covered by the lock will be sent out
2381          * with ASYNC_HP. We have to send out them as soon as possible. */
2382         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2383                 if (oap->oap_async_flags & ASYNC_HP) 
2384                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2385                 else
2386                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2387                 if (++page_count >= cli->cl_max_pages_per_rpc)
2388                         break;
2389         }
2390
2391         cfs_list_splice(&tmp_list, &lop->lop_pending);
2392         page_count = 0;
2393
2394         /* first we find the pages we're allowed to work with */
2395         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2396                                      oap_pending_item) {
2397                 ops = oap->oap_caller_ops;
2398
2399                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2400                          "magic 0x%x\n", oap, oap->oap_magic);
2401
2402                 if (clob == NULL) {
2403                         /* pin object in memory, so that completion call-backs
2404                          * can be safely called under client_obd_list lock. */
2405                         clob = osc_oap2cl_page(oap)->cp_obj;
2406                         cl_object_get(clob);
2407                 }
2408
2409                 if (page_count != 0 &&
2410                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2411                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2412                                " oap %p, page %p, srvlock %u\n",
2413                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2414                         break;
2415                 }
2416
2417                 /* If there is a gap at the start of this page, it can't merge
2418                  * with any previous page, so we'll hand the network a
2419                  * "fragmented" page array that it can't transfer in 1 RDMA */
2420                 if (page_count != 0 && oap->oap_page_off != 0)
2421                         break;
2422
2423                 /* in llite being 'ready' equates to the page being locked
2424                  * until completion unlocks it.  commit_write submits a page
2425                  * as not ready because its unlock will happen unconditionally
2426                  * as the call returns.  if we race with commit_write giving
2427                  * us that page we don't want to create a hole in the page
2428                  * stream, so we stop and leave the rpc to be fired by
2429                  * another dirtier or kupdated interval (the not ready page
2430                  * will still be on the dirty list).  we could call in
2431                  * at the end of ll_file_write to process the queue again. */
2432                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2433                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2434                                                     cmd);
2435                         if (rc < 0)
2436                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2437                                                 "instead of ready\n", oap,
2438                                                 oap->oap_page, rc);
2439                         switch (rc) {
2440                         case -EAGAIN:
2441                                 /* llite is telling us that the page is still
2442                                  * in commit_write and that we should try
2443                                  * and put it in an rpc again later.  we
2444                                  * break out of the loop so we don't create
2445                                  * a hole in the sequence of pages in the rpc
2446                                  * stream.*/
2447                                 oap = NULL;
2448                                 break;
2449                         case -EINTR:
2450                                 /* the io isn't needed.. tell the checks
2451                                  * below to complete the rpc with EINTR */
2452                                 cfs_spin_lock(&oap->oap_lock);
2453                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2454                                 cfs_spin_unlock(&oap->oap_lock);
2455                                 oap->oap_count = -EINTR;
2456                                 break;
2457                         case 0:
2458                                 cfs_spin_lock(&oap->oap_lock);
2459                                 oap->oap_async_flags |= ASYNC_READY;
2460                                 cfs_spin_unlock(&oap->oap_lock);
2461                                 break;
2462                         default:
2463                                 LASSERTF(0, "oap %p page %p returned %d "
2464                                             "from make_ready\n", oap,
2465                                             oap->oap_page, rc);
2466                                 break;
2467                         }
2468                 }
2469                 if (oap == NULL)
2470                         break;
2471                 /*
2472                  * Page submitted for IO has to be locked. Either by
2473                  * ->ap_make_ready() or by higher layers.
2474                  */
2475 #if defined(__KERNEL__) && defined(__linux__)
2476                 {
2477                         struct cl_page *page;
2478
2479                         page = osc_oap2cl_page(oap);
2480
2481                         if (page->cp_type == CPT_CACHEABLE &&
2482                             !(PageLocked(oap->oap_page) &&
2483                               (CheckWriteback(oap->oap_page, cmd)))) {
2484                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2485                                        oap->oap_page,
2486                                        (long)oap->oap_page->flags,
2487                                        oap->oap_async_flags);
2488                                 LBUG();
2489                         }
2490                 }
2491 #endif
2492
2493                 /* take the page out of our book-keeping */
2494                 cfs_list_del_init(&oap->oap_pending_item);
2495                 lop_update_pending(cli, lop, cmd, -1);
2496                 cfs_list_del_init(&oap->oap_urgent_item);
2497
2498                 if (page_count == 0)
2499                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2500                                           (PTLRPC_MAX_BRW_SIZE - 1);
2501
2502                 /* ask the caller for the size of the io as the rpc leaves. */
2503                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2504                         oap->oap_count =
2505                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2506                                                       cmd);
2507                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2508                 }
2509                 if (oap->oap_count <= 0) {
2510                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2511                                oap->oap_count);
2512                         osc_ap_completion(env, cli, NULL,
2513                                           oap, 0, oap->oap_count);
2514                         continue;
2515                 }
2516
2517                 /* now put the page back in our accounting */
2518                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2519                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2520                         mem_tight = 1;
2521                 if (page_count == 0)
2522                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2523                 if (++page_count >= cli->cl_max_pages_per_rpc)
2524                         break;
2525
2526                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2527                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2528                  * have the same alignment as the initial writes that allocated
2529                  * extents on the server. */
2530                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2531                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2532                 if (ending_offset == 0)
2533                         break;
2534
2535                 /* If there is a gap at the end of this page, it can't merge
2536                  * with any subsequent pages, so we'll hand the network a
2537                  * "fragmented" page array that it can't transfer in 1 RDMA */
2538                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2539                         break;
2540         }
2541
2542         osc_wake_cache_waiters(cli);
2543
2544         loi_list_maint(cli, loi);
2545
2546         client_obd_list_unlock(&cli->cl_loi_list_lock);
2547
2548         if (clob != NULL)
2549                 cl_object_put(env, clob);
2550
2551         if (page_count == 0) {
2552                 client_obd_list_lock(&cli->cl_loi_list_lock);
2553                 RETURN(0);
2554         }
2555
2556         req = osc_build_req(env, cli, &rpc_list, page_count,
2557                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2558         if (IS_ERR(req)) {
2559                 LASSERT(cfs_list_empty(&rpc_list));
2560                 loi_list_maint(cli, loi);
2561                 RETURN(PTR_ERR(req));
2562         }
2563
2564         aa = ptlrpc_req_async_args(req);
2565
2566         if (cmd == OBD_BRW_READ) {
2567                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2568                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2569                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2570                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2571         } else {
2572                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2573                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2574                                  cli->cl_w_in_flight);
2575                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2576                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2577         }
2578         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2579
2580         client_obd_list_lock(&cli->cl_loi_list_lock);
2581
2582         if (cmd == OBD_BRW_READ)
2583                 cli->cl_r_in_flight++;
2584         else
2585                 cli->cl_w_in_flight++;
2586
2587         /* queued sync pages can be torn down while the pages
2588          * were between the pending list and the rpc */
2589         tmp = NULL;
2590         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2591                 /* only one oap gets a request reference */
2592                 if (tmp == NULL)
2593                         tmp = oap;
2594                 if (oap->oap_interrupted && !req->rq_intr) {
2595                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2596                                oap, req);
2597                         ptlrpc_mark_interrupted(req);
2598                 }
2599         }
2600         if (tmp != NULL)
2601                 tmp->oap_request = ptlrpc_request_addref(req);
2602
2603         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2604                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2605
2606         req->rq_interpret_reply = brw_interpret;
2607         ptlrpcd_add_req(req, PSCOPE_BRW);
2608         RETURN(1);
2609 }
2610
2611 #define LOI_DEBUG(LOI, STR, args...)                                     \
2612         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2613                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2614                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2615                (LOI)->loi_write_lop.lop_num_pending,                     \
2616                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2617                (LOI)->loi_read_lop.lop_num_pending,                      \
2618                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2619                args)                                                     \
2620
2621 /* This is called by osc_check_rpcs() to find which objects have pages that
2622  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2623 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2624 {
2625         ENTRY;
2626
2627         /* First return objects that have blocked locks so that they
2628          * will be flushed quickly and other clients can get the lock,
2629          * then objects which have pages ready to be stuffed into RPCs */
2630         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2631                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2632                                       struct lov_oinfo, loi_hp_ready_item));
2633         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2634                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2635                                       struct lov_oinfo, loi_ready_item));
2636
2637         /* then if we have cache waiters, return all objects with queued
2638          * writes.  This is especially important when many small files
2639          * have filled up the cache and not been fired into rpcs because
2640          * they don't pass the nr_pending/object threshhold */
2641         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2642             !cfs_list_empty(&cli->cl_loi_write_list))
2643                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2644                                       struct lov_oinfo, loi_write_item));
2645
2646         /* then return all queued objects when we have an invalid import
2647          * so that they get flushed */
2648         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2649                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2650                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2651                                               struct lov_oinfo,
2652                                               loi_write_item));
2653                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2654                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2655                                               struct lov_oinfo, loi_read_item));
2656         }
2657         RETURN(NULL);
2658 }
2659
2660 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2661 {
2662         struct osc_async_page *oap;
2663         int hprpc = 0;
2664
2665         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2666                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2667                                      struct osc_async_page, oap_urgent_item);
2668                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2669         }
2670
2671         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2672                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2673                                      struct osc_async_page, oap_urgent_item);
2674                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2675         }
2676
2677         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2678 }
2679
2680 /* called with the loi list lock held */
2681 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2682 {
2683         struct lov_oinfo *loi;
2684         int rc = 0, race_counter = 0;
2685         ENTRY;
2686
2687         while ((loi = osc_next_loi(cli)) != NULL) {
2688                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2689
2690                 if (osc_max_rpc_in_flight(cli, loi))
2691                         break;
2692
2693                 /* attempt some read/write balancing by alternating between
2694                  * reads and writes in an object.  The makes_rpc checks here
2695                  * would be redundant if we were getting read/write work items
2696                  * instead of objects.  we don't want send_oap_rpc to drain a
2697                  * partial read pending queue when we're given this object to
2698                  * do io on writes while there are cache waiters */
2699                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2700                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2701                                               &loi->loi_write_lop);
2702                         if (rc < 0) {
2703                                 CERROR("Write request failed with %d\n", rc);
2704
2705                                 /* osc_send_oap_rpc failed, mostly because of
2706                                  * memory pressure.
2707                                  *
2708                                  * It can't break here, because if:
2709                                  *  - a page was submitted by osc_io_submit, so
2710                                  *    page locked;
2711                                  *  - no request in flight
2712                                  *  - no subsequent request
2713                                  * The system will be in live-lock state,
2714                                  * because there is no chance to call
2715                                  * osc_io_unplug() and osc_check_rpcs() any
2716                                  * more. pdflush can't help in this case,
2717                                  * because it might be blocked at grabbing
2718                                  * the page lock as we mentioned.
2719                                  *
2720                                  * Anyway, continue to drain pages. */
2721                                 /* break; */
2722                         }
2723
2724                         if (rc > 0)
2725                                 race_counter = 0;
2726                         else
2727                                 race_counter++;
2728                 }
2729                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2730                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2731                                               &loi->loi_read_lop);
2732                         if (rc < 0)
2733                                 CERROR("Read request failed with %d\n", rc);
2734
2735                         if (rc > 0)
2736                                 race_counter = 0;
2737                         else
2738                                 race_counter++;
2739                 }
2740
2741                 /* attempt some inter-object balancing by issuing rpcs
2742                  * for each object in turn */
2743                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2744                         cfs_list_del_init(&loi->loi_hp_ready_item);
2745                 if (!cfs_list_empty(&loi->loi_ready_item))
2746                         cfs_list_del_init(&loi->loi_ready_item);
2747                 if (!cfs_list_empty(&loi->loi_write_item))
2748                         cfs_list_del_init(&loi->loi_write_item);
2749                 if (!cfs_list_empty(&loi->loi_read_item))
2750                         cfs_list_del_init(&loi->loi_read_item);
2751
2752                 loi_list_maint(cli, loi);
2753
2754                 /* send_oap_rpc fails with 0 when make_ready tells it to
2755                  * back off.  llite's make_ready does this when it tries
2756                  * to lock a page queued for write that is already locked.
2757                  * we want to try sending rpcs from many objects, but we
2758                  * don't want to spin failing with 0.  */
2759                 if (race_counter == 10)
2760                         break;
2761         }
2762         EXIT;
2763 }
2764
2765 /* we're trying to queue a page in the osc so we're subject to the
2766  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2767  * If the osc's queued pages are already at that limit, then we want to sleep
2768  * until there is space in the osc's queue for us.  We also may be waiting for
2769  * write credits from the OST if there are RPCs in flight that may return some
2770  * before we fall back to sync writes.
2771  *
2772  * We need this know our allocation was granted in the presence of signals */
2773 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2774 {
2775         int rc;
2776         ENTRY;
2777         client_obd_list_lock(&cli->cl_loi_list_lock);
2778         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2779         client_obd_list_unlock(&cli->cl_loi_list_lock);
2780         RETURN(rc);
2781 };
2782
2783 /**
2784  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2785  * is available.
2786  */
2787 int osc_enter_cache_try(const struct lu_env *env,
2788                         struct client_obd *cli, struct lov_oinfo *loi,
2789                         struct osc_async_page *oap, int transient)
2790 {
2791         int has_grant;
2792
2793         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2794         if (has_grant) {
2795                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2796                 if (transient) {
2797                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2798                         cfs_atomic_inc(&obd_dirty_transit_pages);
2799                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2800                 }
2801         }
2802         return has_grant;
2803 }
2804
2805 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2806  * grant or cache space. */
2807 static int osc_enter_cache(const struct lu_env *env,
2808                            struct client_obd *cli, struct lov_oinfo *loi,
2809                            struct osc_async_page *oap)
2810 {
2811         struct osc_cache_waiter ocw;
2812         struct l_wait_info lwi = { 0 };
2813
2814         ENTRY;
2815
2816         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2817                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2818                cli->cl_dirty_max, obd_max_dirty_pages,
2819                cli->cl_lost_grant, cli->cl_avail_grant);
2820
2821         /* force the caller to try sync io.  this can jump the list
2822          * of queued writes and create a discontiguous rpc stream */
2823         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2824             loi->loi_ar.ar_force_sync)
2825                 RETURN(-EDQUOT);
2826
2827         /* Hopefully normal case - cache space and write credits available */
2828         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2829             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2830             osc_enter_cache_try(env, cli, loi, oap, 0))
2831                 RETURN(0);
2832
2833         /* It is safe to block as a cache waiter as long as there is grant
2834          * space available or the hope of additional grant being returned
2835          * when an in flight write completes.  Using the write back cache
2836          * if possible is preferable to sending the data synchronously
2837          * because write pages can then be merged in to large requests.
2838          * The addition of this cache waiter will causing pending write
2839          * pages to be sent immediately. */
2840         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2841                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2842                 cfs_waitq_init(&ocw.ocw_waitq);
2843                 ocw.ocw_oap = oap;
2844                 ocw.ocw_rc = 0;
2845
2846                 loi_list_maint(cli, loi);
2847                 osc_check_rpcs(env, cli);
2848                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2849
2850                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2851                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2852
2853                 client_obd_list_lock(&cli->cl_loi_list_lock);
2854                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2855                         cfs_list_del(&ocw.ocw_entry);
2856                         RETURN(-EINTR);
2857                 }
2858                 RETURN(ocw.ocw_rc);
2859         }
2860
2861         RETURN(-EDQUOT);
2862 }
2863
2864
2865 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2866                         struct lov_oinfo *loi, cfs_page_t *page,
2867                         obd_off offset, const struct obd_async_page_ops *ops,
2868                         void *data, void **res, int nocache,
2869                         struct lustre_handle *lockh)
2870 {
2871         struct osc_async_page *oap;
2872
2873         ENTRY;
2874
2875         if (!page)
2876                 return cfs_size_round(sizeof(*oap));
2877
2878         oap = *res;
2879         oap->oap_magic = OAP_MAGIC;
2880         oap->oap_cli = &exp->exp_obd->u.cli;
2881         oap->oap_loi = loi;
2882
2883         oap->oap_caller_ops = ops;
2884         oap->oap_caller_data = data;
2885
2886         oap->oap_page = page;
2887         oap->oap_obj_off = offset;
2888         if (!client_is_remote(exp) &&
2889             cfs_capable(CFS_CAP_SYS_RESOURCE))
2890                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2891
2892         LASSERT(!(offset & ~CFS_PAGE_MASK));
2893
2894         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2895         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2896         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2897         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2898
2899         cfs_spin_lock_init(&oap->oap_lock);
2900         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2901         RETURN(0);
2902 }
2903
2904 struct osc_async_page *oap_from_cookie(void *cookie)
2905 {
2906         struct osc_async_page *oap = cookie;
2907         if (oap->oap_magic != OAP_MAGIC)
2908                 return ERR_PTR(-EINVAL);
2909         return oap;
2910 };
2911
2912 int osc_queue_async_io(const struct lu_env *env,
2913                        struct obd_export *exp, struct lov_stripe_md *lsm,
2914                        struct lov_oinfo *loi, void *cookie,
2915                        int cmd, obd_off off, int count,
2916                        obd_flag brw_flags, enum async_flags async_flags)
2917 {
2918         struct client_obd *cli = &exp->exp_obd->u.cli;
2919         struct osc_async_page *oap;
2920         int rc = 0;
2921         ENTRY;
2922
2923         oap = oap_from_cookie(cookie);
2924         if (IS_ERR(oap))
2925                 RETURN(PTR_ERR(oap));
2926
2927         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2928                 RETURN(-EIO);
2929
2930         if (!cfs_list_empty(&oap->oap_pending_item) ||
2931             !cfs_list_empty(&oap->oap_urgent_item) ||
2932             !cfs_list_empty(&oap->oap_rpc_item))
2933                 RETURN(-EBUSY);
2934
2935         /* check if the file's owner/group is over quota */
2936         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2937                 struct cl_object *obj;
2938                 struct cl_attr    attr; /* XXX put attr into thread info */
2939                 unsigned int qid[MAXQUOTAS];
2940
2941                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2942
2943                 cl_object_attr_lock(obj);
2944                 rc = cl_object_attr_get(env, obj, &attr);
2945                 cl_object_attr_unlock(obj);
2946
2947                 qid[USRQUOTA] = attr.cat_uid;
2948                 qid[GRPQUOTA] = attr.cat_gid;
2949                 if (rc == 0 &&
2950                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2951                         rc = -EDQUOT;
2952                 if (rc)
2953                         RETURN(rc);
2954         }
2955
2956         if (loi == NULL)
2957                 loi = lsm->lsm_oinfo[0];
2958
2959         client_obd_list_lock(&cli->cl_loi_list_lock);
2960
2961         LASSERT(off + count <= CFS_PAGE_SIZE);
2962         oap->oap_cmd = cmd;
2963         oap->oap_page_off = off;
2964         oap->oap_count = count;
2965         oap->oap_brw_flags = brw_flags;
2966         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2967         if (cfs_memory_pressure_get())
2968                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2969         cfs_spin_lock(&oap->oap_lock);
2970         oap->oap_async_flags = async_flags;
2971         cfs_spin_unlock(&oap->oap_lock);
2972
2973         if (cmd & OBD_BRW_WRITE) {
2974                 rc = osc_enter_cache(env, cli, loi, oap);
2975                 if (rc) {
2976                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2977                         RETURN(rc);
2978                 }
2979         }
2980
2981         osc_oap_to_pending(oap);
2982         loi_list_maint(cli, loi);
2983
2984         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2985                   cmd);
2986
2987         osc_check_rpcs(env, cli);
2988         client_obd_list_unlock(&cli->cl_loi_list_lock);
2989
2990         RETURN(0);
2991 }
2992
2993 /* aka (~was & now & flag), but this is more clear :) */
2994 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2995
2996 int osc_set_async_flags_base(struct client_obd *cli,
2997                              struct lov_oinfo *loi, struct osc_async_page *oap,
2998                              obd_flag async_flags)
2999 {
3000         struct loi_oap_pages *lop;
3001         int flags = 0;
3002         ENTRY;
3003
3004         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3005
3006         if (oap->oap_cmd & OBD_BRW_WRITE) {
3007                 lop = &loi->loi_write_lop;
3008         } else {
3009                 lop = &loi->loi_read_lop;
3010         }
3011
3012         if ((oap->oap_async_flags & async_flags) == async_flags)
3013                 RETURN(0);
3014
3015         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3016                 flags |= ASYNC_READY;
3017
3018         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3019             cfs_list_empty(&oap->oap_rpc_item)) {
3020                 if (oap->oap_async_flags & ASYNC_HP)
3021                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3022                 else
3023                         cfs_list_add_tail(&oap->oap_urgent_item,
3024                                           &lop->lop_urgent);
3025                 flags |= ASYNC_URGENT;
3026                 loi_list_maint(cli, loi);
3027         }
3028         cfs_spin_lock(&oap->oap_lock);
3029         oap->oap_async_flags |= flags;
3030         cfs_spin_unlock(&oap->oap_lock);
3031
3032         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3033                         oap->oap_async_flags);
3034         RETURN(0);
3035 }
3036
3037 int osc_teardown_async_page(struct obd_export *exp,
3038                             struct lov_stripe_md *lsm,
3039                             struct lov_oinfo *loi, void *cookie)
3040 {
3041         struct client_obd *cli = &exp->exp_obd->u.cli;
3042         struct loi_oap_pages *lop;
3043         struct osc_async_page *oap;
3044         int rc = 0;
3045         ENTRY;
3046
3047         oap = oap_from_cookie(cookie);
3048         if (IS_ERR(oap))
3049                 RETURN(PTR_ERR(oap));
3050
3051         if (loi == NULL)
3052                 loi = lsm->lsm_oinfo[0];
3053
3054         if (oap->oap_cmd & OBD_BRW_WRITE) {
3055                 lop = &loi->loi_write_lop;
3056         } else {
3057                 lop = &loi->loi_read_lop;
3058         }
3059
3060         client_obd_list_lock(&cli->cl_loi_list_lock);
3061
3062         if (!cfs_list_empty(&oap->oap_rpc_item))
3063                 GOTO(out, rc = -EBUSY);
3064
3065         osc_exit_cache(cli, oap, 0);
3066         osc_wake_cache_waiters(cli);
3067
3068         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3069                 cfs_list_del_init(&oap->oap_urgent_item);
3070                 cfs_spin_lock(&oap->oap_lock);
3071                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3072                 cfs_spin_unlock(&oap->oap_lock);
3073         }
3074         if (!cfs_list_empty(&oap->oap_pending_item)) {
3075                 cfs_list_del_init(&oap->oap_pending_item);
3076                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3077         }
3078         loi_list_maint(cli, loi);
3079         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3080 out:
3081         client_obd_list_unlock(&cli->cl_loi_list_lock);
3082         RETURN(rc);
3083 }
3084
3085 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3086                                          struct ldlm_enqueue_info *einfo,
3087                                          int flags)
3088 {
3089         void *data = einfo->ei_cbdata;
3090
3091         LASSERT(lock != NULL);
3092         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3093         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3094         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3095         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3096
3097         lock_res_and_lock(lock);
3098         cfs_spin_lock(&osc_ast_guard);
3099         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3100         lock->l_ast_data = data;
3101         cfs_spin_unlock(&osc_ast_guard);
3102         unlock_res_and_lock(lock);
3103 }
3104
3105 static void osc_set_data_with_check(struct lustre_handle *lockh,
3106                                     struct ldlm_enqueue_info *einfo,
3107                                     int flags)
3108 {
3109         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3110
3111         if (lock != NULL) {
3112                 osc_set_lock_data_with_check(lock, einfo, flags);
3113                 LDLM_LOCK_PUT(lock);
3114         } else
3115                 CERROR("lockh %p, data %p - client evicted?\n",
3116                        lockh, einfo->ei_cbdata);
3117 }
3118
3119 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3120                              ldlm_iterator_t replace, void *data)
3121 {
3122         struct ldlm_res_id res_id;
3123         struct obd_device *obd = class_exp2obd(exp);
3124
3125         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3126         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3127         return 0;
3128 }
3129
3130 /* find any ldlm lock of the inode in osc
3131  * return 0    not find
3132  *        1    find one
3133  *      < 0    error */
3134 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3135                            ldlm_iterator_t replace, void *data)
3136 {
3137         struct ldlm_res_id res_id;
3138         struct obd_device *obd = class_exp2obd(exp);
3139         int rc = 0;
3140
3141         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3142         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3143         if (rc == LDLM_ITER_STOP)
3144                 return(1);
3145         if (rc == LDLM_ITER_CONTINUE)
3146                 return(0);
3147         return(rc);
3148 }
3149
3150 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3151                             obd_enqueue_update_f upcall, void *cookie,
3152                             int *flags, int rc)
3153 {
3154         int intent = *flags & LDLM_FL_HAS_INTENT;
3155         ENTRY;
3156
3157         if (intent) {
3158                 /* The request was created before ldlm_cli_enqueue call. */
3159                 if (rc == ELDLM_LOCK_ABORTED) {
3160                         struct ldlm_reply *rep;
3161                         rep = req_capsule_server_get(&req->rq_pill,
3162                                                      &RMF_DLM_REP);
3163
3164                         LASSERT(rep != NULL);
3165                         if (rep->lock_policy_res1)
3166                                 rc = rep->lock_policy_res1;
3167                 }
3168         }
3169
3170         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3171                 *flags |= LDLM_FL_LVB_READY;
3172                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3173                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3174         }
3175
3176         /* Call the update callback. */
3177         rc = (*upcall)(cookie, rc);
3178         RETURN(rc);
3179 }
3180
3181 static int osc_enqueue_interpret(const struct lu_env *env,
3182                                  struct ptlrpc_request *req,
3183                                  struct osc_enqueue_args *aa, int rc)
3184 {
3185         struct ldlm_lock *lock;
3186         struct lustre_handle handle;
3187         __u32 mode;
3188
3189         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3190          * might be freed anytime after lock upcall has been called. */
3191         lustre_handle_copy(&handle, aa->oa_lockh);
3192         mode = aa->oa_ei->ei_mode;
3193
3194         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3195          * be valid. */
3196         lock = ldlm_handle2lock(&handle);
3197
3198         /* Take an additional reference so that a blocking AST that
3199          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3200          * to arrive after an upcall has been executed by
3201          * osc_enqueue_fini(). */
3202         ldlm_lock_addref(&handle, mode);
3203
3204         /* Complete obtaining the lock procedure. */
3205         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3206                                    mode, aa->oa_flags, aa->oa_lvb,
3207                                    sizeof(*aa->oa_lvb), &handle, rc);
3208         /* Complete osc stuff. */
3209         rc = osc_enqueue_fini(req, aa->oa_lvb,
3210                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3211
3212         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3213
3214         /* Release the lock for async request. */
3215         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3216                 /*
3217                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3218                  * not already released by
3219                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3220                  */
3221                 ldlm_lock_decref(&handle, mode);
3222
3223         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3224                  aa->oa_lockh, req, aa);
3225         ldlm_lock_decref(&handle, mode);
3226         LDLM_LOCK_PUT(lock);
3227         return rc;
3228 }
3229
3230 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3231                         struct lov_oinfo *loi, int flags,
3232                         struct ost_lvb *lvb, __u32 mode, int rc)
3233 {
3234         if (rc == ELDLM_OK) {
3235                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3236                 __u64 tmp;
3237
3238                 LASSERT(lock != NULL);
3239                 loi->loi_lvb = *lvb;
3240                 tmp = loi->loi_lvb.lvb_size;
3241                 /* Extend KMS up to the end of this lock and no further
3242                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3243                 if (tmp > lock->l_policy_data.l_extent.end)
3244                         tmp = lock->l_policy_data.l_extent.end + 1;
3245                 if (tmp >= loi->loi_kms) {
3246                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3247                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3248                         loi_kms_set(loi, tmp);
3249                 } else {
3250                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3251                                    LPU64"; leaving kms="LPU64", end="LPU64,
3252                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3253                                    lock->l_policy_data.l_extent.end);
3254                 }
3255                 ldlm_lock_allow_match(lock);
3256                 LDLM_LOCK_PUT(lock);
3257         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3258                 loi->loi_lvb = *lvb;
3259                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3260                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3261                 rc = ELDLM_OK;
3262         }
3263 }
3264 EXPORT_SYMBOL(osc_update_enqueue);
3265
3266 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3267
3268 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3269  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3270  * other synchronous requests, however keeping some locks and trying to obtain
3271  * others may take a considerable amount of time in a case of ost failure; and
3272  * when other sync requests do not get released lock from a client, the client
3273  * is excluded from the cluster -- such scenarious make the life difficult, so
3274  * release locks just after they are obtained. */
3275 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3276                      int *flags, ldlm_policy_data_t *policy,
3277                      struct ost_lvb *lvb, int kms_valid,
3278                      obd_enqueue_update_f upcall, void *cookie,
3279                      struct ldlm_enqueue_info *einfo,
3280                      struct lustre_handle *lockh,
3281                      struct ptlrpc_request_set *rqset, int async)
3282 {
3283         struct obd_device *obd = exp->exp_obd;
3284         struct ptlrpc_request *req = NULL;
3285         int intent = *flags & LDLM_FL_HAS_INTENT;
3286         ldlm_mode_t mode;
3287         int rc;
3288         ENTRY;
3289
3290         /* Filesystem lock extents are extended to page boundaries so that
3291          * dealing with the page cache is a little smoother.  */
3292         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3293         policy->l_extent.end |= ~CFS_PAGE_MASK;
3294
3295         /*
3296          * kms is not valid when either object is completely fresh (so that no
3297          * locks are cached), or object was evicted. In the latter case cached
3298          * lock cannot be used, because it would prime inode state with
3299          * potentially stale LVB.
3300          */
3301         if (!kms_valid)
3302                 goto no_match;
3303
3304         /* Next, search for already existing extent locks that will cover us */
3305         /* If we're trying to read, we also search for an existing PW lock.  The
3306          * VFS and page cache already protect us locally, so lots of readers/
3307          * writers can share a single PW lock.
3308          *
3309          * There are problems with conversion deadlocks, so instead of
3310          * converting a read lock to a write lock, we'll just enqueue a new
3311          * one.
3312          *
3313          * At some point we should cancel the read lock instead of making them
3314          * send us a blocking callback, but there are problems with canceling
3315          * locks out from other users right now, too. */
3316         mode = einfo->ei_mode;
3317         if (einfo->ei_mode == LCK_PR)
3318                 mode |= LCK_PW;
3319         mode = ldlm_lock_match(obd->obd_namespace,
3320                                *flags | LDLM_FL_LVB_READY, res_id,
3321                                einfo->ei_type, policy, mode, lockh, 0);
3322         if (mode) {
3323                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3324
3325                 if (matched->l_ast_data == NULL ||
3326                     matched->l_ast_data == einfo->ei_cbdata) {
3327                         /* addref the lock only if not async requests and PW
3328                          * lock is matched whereas we asked for PR. */
3329                         if (!rqset && einfo->ei_mode != mode)
3330                                 ldlm_lock_addref(lockh, LCK_PR);
3331                         osc_set_lock_data_with_check(matched, einfo, *flags);
3332                         if (intent) {
3333                                 /* I would like to be able to ASSERT here that
3334                                  * rss <= kms, but I can't, for reasons which
3335                                  * are explained in lov_enqueue() */
3336                         }
3337
3338                         /* We already have a lock, and it's referenced */
3339                         (*upcall)(cookie, ELDLM_OK);
3340
3341                         /* For async requests, decref the lock. */
3342                         if (einfo->ei_mode != mode)
3343                                 ldlm_lock_decref(lockh, LCK_PW);
3344                         else if (rqset)
3345                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3346                         LDLM_LOCK_PUT(matched);
3347                         RETURN(ELDLM_OK);
3348                 } else
3349                         ldlm_lock_decref(lockh, mode);
3350                 LDLM_LOCK_PUT(matched);
3351         }
3352
3353  no_match:
3354         if (intent) {
3355                 CFS_LIST_HEAD(cancels);
3356                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3357                                            &RQF_LDLM_ENQUEUE_LVB);
3358                 if (req == NULL)
3359                         RETURN(-ENOMEM);
3360
3361                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3362                 if (rc)
3363                         RETURN(rc);
3364
3365                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3366                                      sizeof *lvb);
3367                 ptlrpc_request_set_replen(req);
3368         }
3369
3370         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3371         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3372
3373         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3374                               sizeof(*lvb), lockh, async);
3375         if (rqset) {
3376                 if (!rc) {
3377                         struct osc_enqueue_args *aa;
3378                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3379                         aa = ptlrpc_req_async_args(req);
3380                         aa->oa_ei = einfo;
3381                         aa->oa_exp = exp;
3382                         aa->oa_flags  = flags;
3383                         aa->oa_upcall = upcall;
3384                         aa->oa_cookie = cookie;
3385                         aa->oa_lvb    = lvb;
3386                         aa->oa_lockh  = lockh;
3387
3388                         req->rq_interpret_reply =
3389                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3390                         if (rqset == PTLRPCD_SET)
3391                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3392                         else
3393                                 ptlrpc_set_add_req(rqset, req);
3394                 } else if (intent) {
3395                         ptlrpc_req_finished(req);
3396                 }
3397                 RETURN(rc);
3398         }
3399
3400         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3401         if (intent)
3402                 ptlrpc_req_finished(req);
3403
3404         RETURN(rc);
3405 }
3406
3407 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3408                        struct ldlm_enqueue_info *einfo,
3409                        struct ptlrpc_request_set *rqset)
3410 {
3411         struct ldlm_res_id res_id;
3412         int rc;
3413         ENTRY;
3414
3415         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3416                            oinfo->oi_md->lsm_object_seq, &res_id);
3417
3418         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3419                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3420                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3421                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3422                               rqset, rqset != NULL);
3423         RETURN(rc);
3424 }
3425
3426 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3427                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3428                    int *flags, void *data, struct lustre_handle *lockh,
3429                    int unref)
3430 {
3431         struct obd_device *obd = exp->exp_obd;
3432         int lflags = *flags;
3433         ldlm_mode_t rc;
3434         ENTRY;
3435
3436         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3437                 RETURN(-EIO);
3438
3439         /* Filesystem lock extents are extended to page boundaries so that
3440          * dealing with the page cache is a little smoother */
3441         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3442         policy->l_extent.end |= ~CFS_PAGE_MASK;
3443
3444         /* Next, search for already existing extent locks that will cover us */
3445         /* If we're trying to read, we also search for an existing PW lock.  The
3446          * VFS and page cache already protect us locally, so lots of readers/
3447          * writers can share a single PW lock. */
3448         rc = mode;
3449         if (mode == LCK_PR)
3450                 rc |= LCK_PW;
3451         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3452                              res_id, type, policy, rc, lockh, unref);
3453         if (rc) {
3454                 if (data != NULL)
3455                         osc_set_data_with_check(lockh, data, lflags);
3456                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3457                         ldlm_lock_addref(lockh, LCK_PR);
3458                         ldlm_lock_decref(lockh, LCK_PW);
3459                 }
3460                 RETURN(rc);
3461         }
3462         RETURN(rc);
3463 }
3464
3465 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3466 {
3467         ENTRY;
3468
3469         if (unlikely(mode == LCK_GROUP))
3470                 ldlm_lock_decref_and_cancel(lockh, mode);
3471         else
3472                 ldlm_lock_decref(lockh, mode);
3473
3474         RETURN(0);
3475 }
3476
3477 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3478                       __u32 mode, struct lustre_handle *lockh)
3479 {
3480         ENTRY;
3481         RETURN(osc_cancel_base(lockh, mode));
3482 }
3483
3484 static int osc_cancel_unused(struct obd_export *exp,
3485                              struct lov_stripe_md *lsm,
3486                              ldlm_cancel_flags_t flags,
3487                              void *opaque)
3488 {
3489         struct obd_device *obd = class_exp2obd(exp);
3490         struct ldlm_res_id res_id, *resp = NULL;
3491
3492         if (lsm != NULL) {
3493                 resp = osc_build_res_name(lsm->lsm_object_id,
3494                                           lsm->lsm_object_seq, &res_id);
3495         }
3496
3497         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3498 }
3499
3500 static int osc_statfs_interpret(const struct lu_env *env,
3501                                 struct ptlrpc_request *req,
3502                                 struct osc_async_args *aa, int rc)
3503 {
3504         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3505         struct obd_statfs *msfs;
3506         __u64 used;
3507         ENTRY;
3508
3509         if (rc == -EBADR)
3510                 /* The request has in fact never been sent
3511                  * due to issues at a higher level (LOV).
3512                  * Exit immediately since the caller is
3513                  * aware of the problem and takes care
3514                  * of the clean up */
3515                  RETURN(rc);
3516
3517         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3518             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3519                 GOTO(out, rc = 0);
3520
3521         if (rc != 0)
3522                 GOTO(out, rc);
3523
3524         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3525         if (msfs == NULL) {
3526                 GOTO(out, rc = -EPROTO);
3527         }
3528
3529         /* Reinitialize the RDONLY and DEGRADED flags at the client
3530          * on each statfs, so they don't stay set permanently. */
3531         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3532
3533         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3534                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3535         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3536                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3537
3538         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3539                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3540         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3541                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3542
3543         /* Add a bit of hysteresis so this flag isn't continually flapping,
3544          * and ensure that new files don't get extremely fragmented due to
3545          * only a small amount of available space in the filesystem.
3546          * We want to set the NOSPC flag when there is less than ~0.1% free
3547          * and clear it when there is at least ~0.2% free space, so:
3548          *                   avail < ~0.1% max          max = avail + used
3549          *            1025 * avail < avail + used       used = blocks - free
3550          *            1024 * avail < used
3551          *            1024 * avail < blocks - free                      
3552          *                   avail < ((blocks - free) >> 10)    
3553          *
3554          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3555          * lose that amount of space so in those cases we report no space left
3556          * if their is less than 1 GB left.                             */
3557         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3558         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3559                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3560                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3561         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3562                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3563                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3564
3565         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3566
3567         *aa->aa_oi->oi_osfs = *msfs;
3568 out:
3569         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3570         RETURN(rc);
3571 }
3572
3573 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3574                             __u64 max_age, struct ptlrpc_request_set *rqset)
3575 {
3576         struct ptlrpc_request *req;
3577         struct osc_async_args *aa;
3578         int                    rc;
3579         ENTRY;
3580
3581         /* We could possibly pass max_age in the request (as an absolute
3582          * timestamp or a "seconds.usec ago") so the target can avoid doing
3583          * extra calls into the filesystem if that isn't necessary (e.g.
3584          * during mount that would help a bit).  Having relative timestamps
3585          * is not so great if request processing is slow, while absolute
3586          * timestamps are not ideal because they need time synchronization. */
3587         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3588         if (req == NULL)
3589                 RETURN(-ENOMEM);
3590
3591         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3592         if (rc) {
3593                 ptlrpc_request_free(req);
3594                 RETURN(rc);
3595         }
3596         ptlrpc_request_set_replen(req);
3597         req->rq_request_portal = OST_CREATE_PORTAL;
3598         ptlrpc_at_set_req_timeout(req);
3599
3600         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3601                 /* procfs requests not want stat in wait for avoid deadlock */
3602                 req->rq_no_resend = 1;
3603                 req->rq_no_delay = 1;
3604         }
3605
3606         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3607         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3608         aa = ptlrpc_req_async_args(req);
3609         aa->aa_oi = oinfo;
3610
3611         ptlrpc_set_add_req(rqset, req);
3612         RETURN(0);
3613 }
3614
3615 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3616                       __u64 max_age, __u32 flags)
3617 {
3618         struct obd_statfs     *msfs;
3619         struct ptlrpc_request *req;
3620         struct obd_import     *imp = NULL;
3621         int rc;
3622         ENTRY;
3623
3624         /*Since the request might also come from lprocfs, so we need
3625          *sync this with client_disconnect_export Bug15684*/
3626         cfs_down_read(&obd->u.cli.cl_sem);
3627         if (obd->u.cli.cl_import)
3628                 imp = class_import_get(obd->u.cli.cl_import);
3629         cfs_up_read(&obd->u.cli.cl_sem);
3630         if (!imp)
3631                 RETURN(-ENODEV);
3632
3633         /* We could possibly pass max_age in the request (as an absolute
3634          * timestamp or a "seconds.usec ago") so the target can avoid doing
3635          * extra calls into the filesystem if that isn't necessary (e.g.
3636          * during mount that would help a bit).  Having relative timestamps
3637          * is not so great if request processing is slow, while absolute
3638          * timestamps are not ideal because they need time synchronization. */
3639         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3640
3641         class_import_put(imp);
3642
3643         if (req == NULL)
3644                 RETURN(-ENOMEM);
3645
3646         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3647         if (rc) {
3648                 ptlrpc_request_free(req);
3649                 RETURN(rc);
3650         }
3651         ptlrpc_request_set_replen(req);
3652         req->rq_request_portal = OST_CREATE_PORTAL;
3653         ptlrpc_at_set_req_timeout(req);
3654
3655         if (flags & OBD_STATFS_NODELAY) {
3656                 /* procfs requests not want stat in wait for avoid deadlock */
3657                 req->rq_no_resend = 1;
3658                 req->rq_no_delay = 1;
3659         }
3660
3661         rc = ptlrpc_queue_wait(req);
3662         if (rc)
3663                 GOTO(out, rc);
3664
3665         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3666         if (msfs == NULL) {
3667                 GOTO(out, rc = -EPROTO);
3668         }
3669
3670         *osfs = *msfs;
3671
3672         EXIT;
3673  out:
3674         ptlrpc_req_finished(req);
3675         return rc;
3676 }
3677
3678 /* Retrieve object striping information.
3679  *
3680  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3681  * the maximum number of OST indices which will fit in the user buffer.
3682  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3683  */
3684 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3685 {
3686         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3687         struct lov_user_md_v3 lum, *lumk;
3688         struct lov_user_ost_data_v1 *lmm_objects;
3689         int rc = 0, lum_size;
3690         ENTRY;
3691
3692         if (!lsm)
3693                 RETURN(-ENODATA);
3694
3695         /* we only need the header part from user space to get lmm_magic and
3696          * lmm_stripe_count, (the header part is common to v1 and v3) */
3697         lum_size = sizeof(struct lov_user_md_v1);
3698         if (cfs_copy_from_user(&lum, lump, lum_size))
3699                 RETURN(-EFAULT);
3700
3701         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3702             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3703                 RETURN(-EINVAL);
3704
3705         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3706         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3707         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3708         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3709
3710         /* we can use lov_mds_md_size() to compute lum_size
3711          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3712         if (lum.lmm_stripe_count > 0) {
3713                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3714                 OBD_ALLOC(lumk, lum_size);
3715                 if (!lumk)
3716                         RETURN(-ENOMEM);
3717
3718                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3719                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3720                 else
3721                         lmm_objects = &(lumk->lmm_objects[0]);
3722                 lmm_objects->l_object_id = lsm->lsm_object_id;
3723         } else {
3724                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3725                 lumk = &lum;
3726         }
3727
3728         lumk->lmm_object_id = lsm->lsm_object_id;
3729         lumk->lmm_object_seq = lsm->lsm_object_seq;
3730         lumk->lmm_stripe_count = 1;
3731
3732         if (cfs_copy_to_user(lump, lumk, lum_size))
3733                 rc = -EFAULT;
3734
3735         if (lumk != &lum)
3736                 OBD_FREE(lumk, lum_size);
3737
3738         RETURN(rc);
3739 }
3740
3741
3742 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3743                          void *karg, void *uarg)
3744 {
3745         struct obd_device *obd = exp->exp_obd;
3746         struct obd_ioctl_data *data = karg;
3747         int err = 0;
3748         ENTRY;
3749
3750         if (!cfs_try_module_get(THIS_MODULE)) {
3751                 CERROR("Can't get module. Is it alive?");
3752                 return -EINVAL;
3753         }
3754         switch (cmd) {
3755         case OBD_IOC_LOV_GET_CONFIG: {
3756                 char *buf;
3757                 struct lov_desc *desc;
3758                 struct obd_uuid uuid;
3759
3760                 buf = NULL;
3761                 len = 0;
3762                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3763                         GOTO(out, err = -EINVAL);
3764
3765                 data = (struct obd_ioctl_data *)buf;
3766
3767                 if (sizeof(*desc) > data->ioc_inllen1) {
3768                         obd_ioctl_freedata(buf, len);
3769                         GOTO(out, err = -EINVAL);
3770                 }
3771
3772                 if (data->ioc_inllen2 < sizeof(uuid)) {
3773                         obd_ioctl_freedata(buf, len);
3774                         GOTO(out, err = -EINVAL);
3775                 }
3776
3777                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3778                 desc->ld_tgt_count = 1;
3779                 desc->ld_active_tgt_count = 1;
3780                 desc->ld_default_stripe_count = 1;
3781                 desc->ld_default_stripe_size = 0;
3782                 desc->ld_default_stripe_offset = 0;
3783                 desc->ld_pattern = 0;
3784                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3785
3786                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3787
3788                 err = cfs_copy_to_user((void *)uarg, buf, len);
3789                 if (err)
3790                         err = -EFAULT;
3791                 obd_ioctl_freedata(buf, len);
3792                 GOTO(out, err);
3793         }
3794         case LL_IOC_LOV_SETSTRIPE:
3795                 err = obd_alloc_memmd(exp, karg);
3796                 if (err > 0)
3797                         err = 0;
3798                 GOTO(out, err);
3799         case LL_IOC_LOV_GETSTRIPE:
3800                 err = osc_getstripe(karg, uarg);
3801                 GOTO(out, err);
3802         case OBD_IOC_CLIENT_RECOVER:
3803                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3804                                             data->ioc_inlbuf1);
3805                 if (err > 0)
3806                         err = 0;
3807                 GOTO(out, err);
3808         case IOC_OSC_SET_ACTIVE:
3809                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3810                                                data->ioc_offset);
3811                 GOTO(out, err);
3812         case OBD_IOC_POLL_QUOTACHECK:
3813                 err = lquota_poll_check(quota_interface, exp,
3814                                         (struct if_quotacheck *)karg);
3815                 GOTO(out, err);
3816         case OBD_IOC_PING_TARGET:
3817                 err = ptlrpc_obd_ping(obd);
3818                 GOTO(out, err);
3819         default:
3820                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3821                        cmd, cfs_curproc_comm());
3822                 GOTO(out, err = -ENOTTY);
3823         }
3824 out:
3825         cfs_module_put(THIS_MODULE);
3826         return err;
3827 }
3828
3829 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3830                         void *key, __u32 *vallen, void *val,
3831                         struct lov_stripe_md *lsm)
3832 {
3833         ENTRY;
3834         if (!vallen || !val)
3835                 RETURN(-EFAULT);
3836
3837         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3838                 __u32 *stripe = val;
3839                 *vallen = sizeof(*stripe);
3840                 *stripe = 0;
3841                 RETURN(0);
3842         } else if (KEY_IS(KEY_LAST_ID)) {
3843                 struct ptlrpc_request *req;
3844                 obd_id                *reply;
3845                 char                  *tmp;
3846                 int                    rc;
3847
3848                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3849                                            &RQF_OST_GET_INFO_LAST_ID);
3850                 if (req == NULL)
3851                         RETURN(-ENOMEM);
3852
3853                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3854                                      RCL_CLIENT, keylen);
3855                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3856                 if (rc) {
3857                         ptlrpc_request_free(req);
3858                         RETURN(rc);
3859                 }
3860
3861                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3862                 memcpy(tmp, key, keylen);
3863
3864                 req->rq_no_delay = req->rq_no_resend = 1;
3865                 ptlrpc_request_set_replen(req);
3866                 rc = ptlrpc_queue_wait(req);
3867                 if (rc)
3868                         GOTO(out, rc);
3869
3870                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3871                 if (reply == NULL)
3872                         GOTO(out, rc = -EPROTO);
3873
3874                 *((obd_id *)val) = *reply;
3875         out:
3876                 ptlrpc_req_finished(req);
3877                 RETURN(rc);
3878         } else if (KEY_IS(KEY_FIEMAP)) {
3879                 struct ptlrpc_request *req;
3880                 struct ll_user_fiemap *reply;
3881                 char *tmp;
3882                 int rc;
3883
3884                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3885                                            &RQF_OST_GET_INFO_FIEMAP);
3886                 if (req == NULL)
3887                         RETURN(-ENOMEM);
3888
3889                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3890                                      RCL_CLIENT, keylen);
3891                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3892                                      RCL_CLIENT, *vallen);
3893                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3894                                      RCL_SERVER, *vallen);
3895
3896                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3897                 if (rc) {
3898                         ptlrpc_request_free(req);
3899                         RETURN(rc);
3900                 }
3901
3902                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3903                 memcpy(tmp, key, keylen);
3904                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3905                 memcpy(tmp, val, *vallen);
3906
3907                 ptlrpc_request_set_replen(req);
3908                 rc = ptlrpc_queue_wait(req);
3909                 if (rc)
3910                         GOTO(out1, rc);
3911
3912                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3913                 if (reply == NULL)
3914                         GOTO(out1, rc = -EPROTO);
3915
3916                 memcpy(val, reply, *vallen);
3917         out1:
3918                 ptlrpc_req_finished(req);
3919
3920                 RETURN(rc);
3921         }
3922
3923         RETURN(-EINVAL);
3924 }
3925
3926 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3927 {
3928         struct llog_ctxt *ctxt;
3929         int rc = 0;
3930         ENTRY;
3931
3932         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3933         if (ctxt) {
3934                 rc = llog_initiator_connect(ctxt);
3935                 llog_ctxt_put(ctxt);
3936         } else {
3937                 /* XXX return an error? skip setting below flags? */
3938         }
3939
3940         cfs_spin_lock(&imp->imp_lock);
3941         imp->imp_server_timeout = 1;
3942         imp->imp_pingable = 1;
3943         cfs_spin_unlock(&imp->imp_lock);
3944         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3945
3946         RETURN(rc);
3947 }
3948
3949 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3950                                           struct ptlrpc_request *req,
3951                                           void *aa, int rc)
3952 {
3953         ENTRY;
3954         if (rc != 0)
3955                 RETURN(rc);
3956
3957         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3958 }
3959
3960 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3961                               void *key, obd_count vallen, void *val,
3962                               struct ptlrpc_request_set *set)
3963 {
3964         struct ptlrpc_request *req;
3965         struct obd_device     *obd = exp->exp_obd;
3966         struct obd_import     *imp = class_exp2cliimp(exp);
3967         char                  *tmp;
3968         int                    rc;
3969         ENTRY;
3970
3971         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3972
3973         if (KEY_IS(KEY_NEXT_ID)) {
3974                 obd_id new_val;
3975                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3976
3977                 if (vallen != sizeof(obd_id))
3978                         RETURN(-ERANGE);
3979                 if (val == NULL)
3980                         RETURN(-EINVAL);
3981
3982                 if (vallen != sizeof(obd_id))
3983                         RETURN(-EINVAL);
3984
3985                 /* avoid race between allocate new object and set next id
3986                  * from ll_sync thread */
3987                 cfs_spin_lock(&oscc->oscc_lock);
3988                 new_val = *((obd_id*)val) + 1;
3989                 if (new_val > oscc->oscc_next_id)
3990                         oscc->oscc_next_id = new_val;
3991                 cfs_spin_unlock(&oscc->oscc_lock);
3992                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3993                        exp->exp_obd->obd_name,
3994                        obd->u.cli.cl_oscc.oscc_next_id);
3995
3996                 RETURN(0);
3997         }
3998
3999         if (KEY_IS(KEY_CHECKSUM)) {
4000                 if (vallen != sizeof(int))
4001                         RETURN(-EINVAL);
4002                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4003                 RETURN(0);
4004         }
4005
4006         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4007                 sptlrpc_conf_client_adapt(obd);
4008                 RETURN(0);
4009         }
4010
4011         if (KEY_IS(KEY_FLUSH_CTX)) {
4012                 sptlrpc_import_flush_my_ctx(imp);
4013                 RETURN(0);
4014         }
4015
4016         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4017                 RETURN(-EINVAL);
4018
4019         /* We pass all other commands directly to OST. Since nobody calls osc
4020            methods directly and everybody is supposed to go through LOV, we
4021            assume lov checked invalid values for us.
4022            The only recognised values so far are evict_by_nid and mds_conn.
4023            Even if something bad goes through, we'd get a -EINVAL from OST
4024            anyway. */
4025
4026         if (KEY_IS(KEY_GRANT_SHRINK))
4027                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4028         else
4029                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4030
4031         if (req == NULL)
4032                 RETURN(-ENOMEM);
4033
4034         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4035                              RCL_CLIENT, keylen);
4036         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4037                              RCL_CLIENT, vallen);
4038         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4039         if (rc) {
4040                 ptlrpc_request_free(req);
4041                 RETURN(rc);
4042         }
4043
4044         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4045         memcpy(tmp, key, keylen);
4046         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4047         memcpy(tmp, val, vallen);
4048
4049         if (KEY_IS(KEY_MDS_CONN)) {
4050                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4051
4052                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4053                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4054                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4055                 req->rq_no_delay = req->rq_no_resend = 1;
4056                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4057         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4058                 struct osc_grant_args *aa;
4059                 struct obdo *oa;
4060
4061                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4062                 aa = ptlrpc_req_async_args(req);
4063                 OBD_ALLOC_PTR(oa);
4064                 if (!oa) {
4065                         ptlrpc_req_finished(req);
4066                         RETURN(-ENOMEM);
4067                 }
4068                 *oa = ((struct ost_body *)val)->oa;
4069                 aa->aa_oa = oa;
4070                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4071         }
4072
4073         ptlrpc_request_set_replen(req);
4074         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4075                 LASSERT(set != NULL);
4076                 ptlrpc_set_add_req(set, req);
4077                 ptlrpc_check_set(NULL, set);
4078         } else
4079                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4080
4081         RETURN(0);
4082 }
4083
4084
4085 static struct llog_operations osc_size_repl_logops = {
4086         lop_cancel: llog_obd_repl_cancel
4087 };
4088
4089 static struct llog_operations osc_mds_ost_orig_logops;
4090
4091 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4092                            struct obd_device *tgt, struct llog_catid *catid)
4093 {
4094         int rc;
4095         ENTRY;
4096
4097         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4098                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4099         if (rc) {
4100                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4101                 GOTO(out, rc);
4102         }
4103
4104         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4105                         NULL, &osc_size_repl_logops);
4106         if (rc) {
4107                 struct llog_ctxt *ctxt =
4108                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4109                 if (ctxt)
4110                         llog_cleanup(ctxt);
4111                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4112         }
4113         GOTO(out, rc);
4114 out:
4115         if (rc) {
4116                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4117                        obd->obd_name, tgt->obd_name, catid, rc);
4118                 CERROR("logid "LPX64":0x%x\n",
4119                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4120         }
4121         return rc;
4122 }
4123
4124 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4125                          struct obd_device *disk_obd, int *index)
4126 {
4127         struct llog_catid catid;
4128         static char name[32] = CATLIST;
4129         int rc;
4130         ENTRY;
4131
4132         LASSERT(olg == &obd->obd_olg);
4133
4134         cfs_mutex_down(&olg->olg_cat_processing);
4135         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4136         if (rc) {
4137                 CERROR("rc: %d\n", rc);
4138                 GOTO(out, rc);
4139         }
4140
4141         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4142                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4143                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4144
4145         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4146         if (rc) {
4147                 CERROR("rc: %d\n", rc);
4148                 GOTO(out, rc);
4149         }
4150
4151         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4152         if (rc) {
4153                 CERROR("rc: %d\n", rc);
4154                 GOTO(out, rc);
4155         }
4156
4157  out:
4158         cfs_mutex_up(&olg->olg_cat_processing);
4159
4160         return rc;
4161 }
4162
4163 static int osc_llog_finish(struct obd_device *obd, int count)
4164 {
4165         struct llog_ctxt *ctxt;
4166         int rc = 0, rc2 = 0;
4167         ENTRY;
4168
4169         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4170         if (ctxt)
4171                 rc = llog_cleanup(ctxt);
4172
4173         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4174         if (ctxt)
4175                 rc2 = llog_cleanup(ctxt);
4176         if (!rc)
4177                 rc = rc2;
4178
4179         RETURN(rc);
4180 }
4181
4182 static int osc_reconnect(const struct lu_env *env,
4183                          struct obd_export *exp, struct obd_device *obd,
4184                          struct obd_uuid *cluuid,
4185                          struct obd_connect_data *data,
4186                          void *localdata)
4187 {
4188         struct client_obd *cli = &obd->u.cli;
4189
4190         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4191                 long lost_grant;
4192
4193                 client_obd_list_lock(&cli->cl_loi_list_lock);
4194                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4195                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4196                 lost_grant = cli->cl_lost_grant;
4197                 cli->cl_lost_grant = 0;
4198                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4199
4200                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4201                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4202                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4203                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4204                        " ocd_grant: %d\n", data->ocd_connect_flags,
4205                        data->ocd_version, data->ocd_grant);
4206         }
4207
4208         RETURN(0);
4209 }
4210
4211 static int osc_disconnect(struct obd_export *exp)
4212 {
4213         struct obd_device *obd = class_exp2obd(exp);
4214         struct llog_ctxt  *ctxt;
4215         int rc;
4216
4217         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4218         if (ctxt) {
4219                 if (obd->u.cli.cl_conn_count == 1) {
4220                         /* Flush any remaining cancel messages out to the
4221                          * target */
4222                         llog_sync(ctxt, exp);
4223                 }
4224                 llog_ctxt_put(ctxt);
4225         } else {
4226                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4227                        obd);
4228         }
4229
4230         rc = client_disconnect_export(exp);
4231         /**
4232          * Initially we put del_shrink_grant before disconnect_export, but it
4233          * causes the following problem if setup (connect) and cleanup
4234          * (disconnect) are tangled together.
4235          *      connect p1                     disconnect p2
4236          *   ptlrpc_connect_import
4237          *     ...............               class_manual_cleanup
4238          *                                     osc_disconnect
4239          *                                     del_shrink_grant
4240          *   ptlrpc_connect_interrupt
4241          *     init_grant_shrink
4242          *   add this client to shrink list
4243          *                                      cleanup_osc
4244          * Bang! pinger trigger the shrink.
4245          * So the osc should be disconnected from the shrink list, after we
4246          * are sure the import has been destroyed. BUG18662
4247          */
4248         if (obd->u.cli.cl_import == NULL)
4249                 osc_del_shrink_grant(&obd->u.cli);
4250         return rc;
4251 }
4252
4253 static int osc_import_event(struct obd_device *obd,
4254                             struct obd_import *imp,
4255                             enum obd_import_event event)
4256 {
4257         struct client_obd *cli;
4258         int rc = 0;
4259
4260         ENTRY;
4261         LASSERT(imp->imp_obd == obd);
4262
4263         switch (event) {
4264         case IMP_EVENT_DISCON: {
4265                 /* Only do this on the MDS OSC's */
4266                 if (imp->imp_server_timeout) {
4267                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4268
4269                         cfs_spin_lock(&oscc->oscc_lock);
4270                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4271                         cfs_spin_unlock(&oscc->oscc_lock);
4272                 }
4273                 cli = &obd->u.cli;
4274                 client_obd_list_lock(&cli->cl_loi_list_lock);
4275                 cli->cl_avail_grant = 0;
4276                 cli->cl_lost_grant = 0;
4277                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4278                 break;
4279         }
4280         case IMP_EVENT_INACTIVE: {
4281                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4282                 break;
4283         }
4284         case IMP_EVENT_INVALIDATE: {
4285                 struct ldlm_namespace *ns = obd->obd_namespace;
4286                 struct lu_env         *env;
4287                 int                    refcheck;
4288
4289                 env = cl_env_get(&refcheck);
4290                 if (!IS_ERR(env)) {
4291                         /* Reset grants */
4292                         cli = &obd->u.cli;
4293                         client_obd_list_lock(&cli->cl_loi_list_lock);
4294                         /* all pages go to failing rpcs due to the invalid
4295                          * import */
4296                         osc_check_rpcs(env, cli);
4297                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4298
4299                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4300                         cl_env_put(env, &refcheck);
4301                 } else
4302                         rc = PTR_ERR(env);
4303                 break;
4304         }
4305         case IMP_EVENT_ACTIVE: {
4306                 /* Only do this on the MDS OSC's */
4307                 if (imp->imp_server_timeout) {
4308                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4309
4310                         cfs_spin_lock(&oscc->oscc_lock);
4311                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4312                         cfs_spin_unlock(&oscc->oscc_lock);
4313                 }
4314                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4315                 break;
4316         }
4317         case IMP_EVENT_OCD: {
4318                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4319
4320                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4321                         osc_init_grant(&obd->u.cli, ocd);
4322
4323                 /* See bug 7198 */
4324                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4325                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4326
4327                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4328                 break;
4329         }
4330         default:
4331                 CERROR("Unknown import event %d\n", event);
4332                 LBUG();
4333         }
4334         RETURN(rc);
4335 }
4336
4337 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4338 {
4339         int rc;
4340         ENTRY;
4341
4342         ENTRY;
4343         rc = ptlrpcd_addref();
4344         if (rc)
4345                 RETURN(rc);
4346
4347         rc = client_obd_setup(obd, lcfg);
4348         if (rc) {
4349                 ptlrpcd_decref();
4350         } else {
4351                 struct lprocfs_static_vars lvars = { 0 };
4352                 struct client_obd *cli = &obd->u.cli;
4353
4354                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4355                 lprocfs_osc_init_vars(&lvars);
4356                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4357                         lproc_osc_attach_seqstat(obd);
4358                         sptlrpc_lprocfs_cliobd_attach(obd);
4359                         ptlrpc_lprocfs_register_obd(obd);
4360                 }
4361
4362                 oscc_init(obd);
4363                 /* We need to allocate a few requests more, because
4364                    brw_interpret tries to create new requests before freeing
4365                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4366                    reserved, but I afraid that might be too much wasted RAM
4367                    in fact, so 2 is just my guess and still should work. */
4368                 cli->cl_import->imp_rq_pool =
4369                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4370                                             OST_MAXREQSIZE,
4371                                             ptlrpc_add_rqs_to_pool);
4372
4373                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4374                 cfs_sema_init(&cli->cl_grant_sem, 1);
4375         }
4376
4377         RETURN(rc);
4378 }
4379
4380 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4381 {
4382         int rc = 0;
4383         ENTRY;
4384
4385         switch (stage) {
4386         case OBD_CLEANUP_EARLY: {
4387                 struct obd_import *imp;
4388                 imp = obd->u.cli.cl_import;
4389                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4390                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4391                 ptlrpc_deactivate_import(imp);
4392                 cfs_spin_lock(&imp->imp_lock);
4393                 imp->imp_pingable = 0;
4394                 cfs_spin_unlock(&imp->imp_lock);
4395                 break;
4396         }
4397         case OBD_CLEANUP_EXPORTS: {
4398                 /* If we set up but never connected, the
4399                    client import will not have been cleaned. */
4400                 if (obd->u.cli.cl_import) {
4401                         struct obd_import *imp;
4402                         cfs_down_write(&obd->u.cli.cl_sem);
4403                         imp = obd->u.cli.cl_import;
4404                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4405                                obd->obd_name);
4406                         ptlrpc_invalidate_import(imp);
4407                         if (imp->imp_rq_pool) {
4408                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4409                                 imp->imp_rq_pool = NULL;
4410                         }
4411                         class_destroy_import(imp);
4412                         cfs_up_write(&obd->u.cli.cl_sem);
4413                         obd->u.cli.cl_import = NULL;
4414                 }
4415                 rc = obd_llog_finish(obd, 0);
4416                 if (rc != 0)
4417                         CERROR("failed to cleanup llogging subsystems\n");
4418                 break;
4419                 }
4420         }
4421         RETURN(rc);
4422 }
4423
4424 int osc_cleanup(struct obd_device *obd)
4425 {
4426         int rc;
4427
4428         ENTRY;
4429         ptlrpc_lprocfs_unregister_obd(obd);
4430         lprocfs_obd_cleanup(obd);
4431
4432         /* free memory of osc quota cache */
4433         lquota_cleanup(quota_interface, obd);
4434
4435         rc = client_obd_cleanup(obd);
4436
4437         ptlrpcd_decref();
4438         RETURN(rc);
4439 }
4440
4441 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4442 {
4443         struct lprocfs_static_vars lvars = { 0 };
4444         int rc = 0;
4445
4446         lprocfs_osc_init_vars(&lvars);
4447
4448         switch (lcfg->lcfg_command) {
4449         default:
4450                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4451                                               lcfg, obd);
4452                 if (rc > 0)
4453                         rc = 0;
4454                 break;
4455         }
4456
4457         return(rc);
4458 }
4459
4460 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4461 {
4462         return osc_process_config_base(obd, buf);
4463 }
4464
4465 struct obd_ops osc_obd_ops = {
4466         .o_owner                = THIS_MODULE,
4467         .o_setup                = osc_setup,
4468         .o_precleanup           = osc_precleanup,
4469         .o_cleanup              = osc_cleanup,
4470         .o_add_conn             = client_import_add_conn,
4471         .o_del_conn             = client_import_del_conn,
4472         .o_connect              = client_connect_import,
4473         .o_reconnect            = osc_reconnect,
4474         .o_disconnect           = osc_disconnect,
4475         .o_statfs               = osc_statfs,
4476         .o_statfs_async         = osc_statfs_async,
4477         .o_packmd               = osc_packmd,
4478         .o_unpackmd             = osc_unpackmd,
4479         .o_precreate            = osc_precreate,
4480         .o_create               = osc_create,
4481         .o_create_async         = osc_create_async,
4482         .o_destroy              = osc_destroy,
4483         .o_getattr              = osc_getattr,
4484         .o_getattr_async        = osc_getattr_async,
4485         .o_setattr              = osc_setattr,
4486         .o_setattr_async        = osc_setattr_async,
4487         .o_brw                  = osc_brw,
4488         .o_punch                = osc_punch,
4489         .o_sync                 = osc_sync,
4490         .o_enqueue              = osc_enqueue,
4491         .o_change_cbdata        = osc_change_cbdata,
4492         .o_find_cbdata          = osc_find_cbdata,
4493         .o_cancel               = osc_cancel,
4494         .o_cancel_unused        = osc_cancel_unused,
4495         .o_iocontrol            = osc_iocontrol,
4496         .o_get_info             = osc_get_info,
4497         .o_set_info_async       = osc_set_info_async,
4498         .o_import_event         = osc_import_event,
4499         .o_llog_init            = osc_llog_init,
4500         .o_llog_finish          = osc_llog_finish,
4501         .o_process_config       = osc_process_config,
4502 };
4503
4504 extern struct lu_kmem_descr osc_caches[];
4505 extern cfs_spinlock_t       osc_ast_guard;
4506 extern cfs_lock_class_key_t osc_ast_guard_class;
4507
4508 int __init osc_init(void)
4509 {
4510         struct lprocfs_static_vars lvars = { 0 };
4511         int rc;
4512         ENTRY;
4513
4514         /* print an address of _any_ initialized kernel symbol from this
4515          * module, to allow debugging with gdb that doesn't support data
4516          * symbols from modules.*/
4517         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4518
4519         rc = lu_kmem_init(osc_caches);
4520
4521         lprocfs_osc_init_vars(&lvars);
4522
4523         cfs_request_module("lquota");
4524         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4525         lquota_init(quota_interface);
4526         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4527
4528         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4529                                  LUSTRE_OSC_NAME, &osc_device_type);
4530         if (rc) {
4531                 if (quota_interface)
4532                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4533                 lu_kmem_fini(osc_caches);
4534                 RETURN(rc);
4535         }
4536
4537         cfs_spin_lock_init(&osc_ast_guard);
4538         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4539
4540         osc_mds_ost_orig_logops = llog_lvfs_ops;
4541         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4542         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4543         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4544         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4545
4546         RETURN(rc);
4547 }
4548
4549 #ifdef __KERNEL__
4550 static void /*__exit*/ osc_exit(void)
4551 {
4552         lu_device_type_fini(&osc_device_type);
4553
4554         lquota_exit(quota_interface);
4555         if (quota_interface)
4556                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4557
4558         class_unregister_type(LUSTRE_OSC_NAME);
4559         lu_kmem_fini(osc_caches);
4560 }
4561
4562 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4563 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4564 MODULE_LICENSE("GPL");
4565
4566 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4567 #endif