Whamcloud - gitweb
b=22176 Add .sync_fs super block handler
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
362 out:
363         rc = sa->sa_upcall(sa->sa_cookie, rc);
364         RETURN(rc);
365 }
366
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368                            struct obd_trans_info *oti,
369                            obd_enqueue_update_f upcall, void *cookie,
370                            struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request   *req;
373         struct osc_setattr_args *sa;
374         int                      rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404                 sa = ptlrpc_req_async_args(req);
405                 sa->sa_oa = oinfo->oi_oa;
406                 sa->sa_upcall = upcall;
407                 sa->sa_cookie = cookie;
408
409                 if (rqset == PTLRPCD_SET)
410                         ptlrpcd_add_req(req, PSCOPE_OTHER);
411                 else
412                         ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419                              struct obd_trans_info *oti,
420                              struct ptlrpc_request_set *rqset)
421 {
422         return osc_setattr_async_base(exp, oinfo, oti,
423                                       oinfo->oi_cb_up, oinfo, rqset);
424 }
425
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
428 {
429         struct ptlrpc_request *req;
430         struct ost_body       *body;
431         struct lov_stripe_md  *lsm;
432         int                    rc;
433         ENTRY;
434
435         LASSERT(oa);
436         LASSERT(ea);
437
438         lsm = *ea;
439         if (!lsm) {
440                 rc = obd_alloc_memmd(exp, &lsm);
441                 if (rc < 0)
442                         RETURN(rc);
443         }
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
446         if (req == NULL)
447                 GOTO(out, rc = -ENOMEM);
448
449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 GOTO(out, rc);
453         }
454
455         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
456         LASSERT(body);
457         lustre_set_wire_obdo(&body->oa, oa);
458
459         ptlrpc_request_set_replen(req);
460
461         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462             oa->o_flags == OBD_FL_DELORPHAN) {
463                 DEBUG_REQ(D_HA, req,
464                           "delorphan from OST integration");
465                 /* Don't resend the delorphan req */
466                 req->rq_no_resend = req->rq_no_delay = 1;
467         }
468
469         rc = ptlrpc_queue_wait(req);
470         if (rc)
471                 GOTO(out_req, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL)
475                 GOTO(out_req, rc = -EPROTO);
476
477         lustre_get_wire_obdo(oa, &body->oa);
478
479         /* This should really be sent by the OST */
480         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481         oa->o_valid |= OBD_MD_FLBLKSZ;
482
483         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484          * have valid lsm_oinfo data structs, so don't go touching that.
485          * This needs to be fixed in a big way.
486          */
487         lsm->lsm_object_id = oa->o_id;
488         lsm->lsm_object_seq = oa->o_seq;
489         *ea = lsm;
490
491         if (oti != NULL) {
492                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
493
494                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495                         if (!oti->oti_logcookies)
496                                 oti_alloc_cookies(oti, 1);
497                         *oti->oti_logcookies = oa->o_lcookie;
498                 }
499         }
500
501         CDEBUG(D_HA, "transno: "LPD64"\n",
502                lustre_msg_get_transno(req->rq_repmsg));
503 out_req:
504         ptlrpc_req_finished(req);
505 out:
506         if (rc && !*ea)
507                 obd_free_memmd(exp, &lsm);
508         RETURN(rc);
509 }
510
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512                    obd_enqueue_update_f upcall, void *cookie,
513                    struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request   *req;
516         struct osc_setattr_args *sa;
517         struct ost_body         *body;
518         int                      rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
522         if (req == NULL)
523                 RETURN(-ENOMEM);
524
525         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532         ptlrpc_at_set_req_timeout(req);
533
534         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
535         LASSERT(body);
536         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537         osc_pack_capa(req, body, oinfo->oi_capa);
538
539         ptlrpc_request_set_replen(req);
540
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PSCOPE_OTHER);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557                      struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568                     struct lov_stripe_md *md, obd_size start, obd_size end,
569                     void *capa)
570 {
571         struct ptlrpc_request *req;
572         struct ost_body       *body;
573         int                    rc;
574         ENTRY;
575
576         if (!oa) {
577                 CDEBUG(D_INFO, "oa NULL\n");
578                 RETURN(-EINVAL);
579         }
580
581         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
582         if (req == NULL)
583                 RETURN(-ENOMEM);
584
585         osc_set_capa_size(req, &RMF_CAPA1, capa);
586         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
587         if (rc) {
588                 ptlrpc_request_free(req);
589                 RETURN(rc);
590         }
591
592         /* overload the size and blocks fields in the oa with start/end */
593         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
594         LASSERT(body);
595         lustre_set_wire_obdo(&body->oa, oa);
596         body->oa.o_size = start;
597         body->oa.o_blocks = end;
598         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599         osc_pack_capa(req, body, capa);
600
601         ptlrpc_request_set_replen(req);
602
603         rc = ptlrpc_queue_wait(req);
604         if (rc)
605                 GOTO(out, rc);
606
607         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
608         if (body == NULL)
609                 GOTO(out, rc = -EPROTO);
610
611         lustre_get_wire_obdo(oa, &body->oa);
612
613         EXIT;
614  out:
615         ptlrpc_req_finished(req);
616         return rc;
617 }
618
619 /* Find and cancel locally locks matched by @mode in the resource found by
620  * @objid. Found locks are added into @cancel list. Returns the amount of
621  * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
623                                    cfs_list_t *cancels,
624                                    ldlm_mode_t mode, int lock_flags)
625 {
626         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627         struct ldlm_res_id res_id;
628         struct ldlm_resource *res;
629         int count;
630         ENTRY;
631
632         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
634         if (res == NULL)
635                 RETURN(0);
636
637         LDLM_RESOURCE_ADDREF(res);
638         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639                                            lock_flags, 0, NULL);
640         LDLM_RESOURCE_DELREF(res);
641         ldlm_resource_putref(res);
642         RETURN(count);
643 }
644
645 static int osc_destroy_interpret(const struct lu_env *env,
646                                  struct ptlrpc_request *req, void *data,
647                                  int rc)
648 {
649         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
650
651         cfs_atomic_dec(&cli->cl_destroy_in_flight);
652         cfs_waitq_signal(&cli->cl_destroy_waitq);
653         return 0;
654 }
655
656 static int osc_can_send_destroy(struct client_obd *cli)
657 {
658         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659             cli->cl_max_rpcs_in_flight) {
660                 /* The destroy request can be sent */
661                 return 1;
662         }
663         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664             cli->cl_max_rpcs_in_flight) {
665                 /*
666                  * The counter has been modified between the two atomic
667                  * operations.
668                  */
669                 cfs_waitq_signal(&cli->cl_destroy_waitq);
670         }
671         return 0;
672 }
673
674 /* Destroy requests can be async always on the client, and we don't even really
675  * care about the return code since the client cannot do anything at all about
676  * a destroy failure.
677  * When the MDS is unlinking a filename, it saves the file objects into a
678  * recovery llog, and these object records are cancelled when the OST reports
679  * they were destroyed and sync'd to disk (i.e. transaction committed).
680  * If the client dies, or the OST is down when the object should be destroyed,
681  * the records are not cancelled, and when the OST reconnects to the MDS next,
682  * it will retrieve the llog unlink logs and then sends the log cancellation
683  * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
686                        struct obd_export *md_export, void *capa)
687 {
688         struct client_obd     *cli = &exp->exp_obd->u.cli;
689         struct ptlrpc_request *req;
690         struct ost_body       *body;
691         CFS_LIST_HEAD(cancels);
692         int rc, count;
693         ENTRY;
694
695         if (!oa) {
696                 CDEBUG(D_INFO, "oa NULL\n");
697                 RETURN(-EINVAL);
698         }
699
700         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701                                         LDLM_FL_DISCARD_DATA);
702
703         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
704         if (req == NULL) {
705                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
706                 RETURN(-ENOMEM);
707         }
708
709         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
711                                0, &cancels, count);
712         if (rc) {
713                 ptlrpc_request_free(req);
714                 RETURN(rc);
715         }
716
717         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718         ptlrpc_at_set_req_timeout(req);
719
720         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721                 oa->o_lcookie = *oti->oti_logcookies;
722         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
723         LASSERT(body);
724         lustre_set_wire_obdo(&body->oa, oa);
725
726         osc_pack_capa(req, body, (struct obd_capa *)capa);
727         ptlrpc_request_set_replen(req);
728
729         /* don't throttle destroy RPCs for the MDT */
730         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731                 req->rq_interpret_reply = osc_destroy_interpret;
732                 if (!osc_can_send_destroy(cli)) {
733                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
734                                                           NULL);
735
736                         /*
737                          * Wait until the number of on-going destroy RPCs drops
738                          * under max_rpc_in_flight
739                          */
740                         l_wait_event_exclusive(cli->cl_destroy_waitq,
741                                                osc_can_send_destroy(cli), &lwi);
742                 }
743         }
744
745         /* Do not wait for response */
746         ptlrpcd_add_req(req, PSCOPE_OTHER);
747         RETURN(0);
748 }
749
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
751                                 long writing_bytes)
752 {
753         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
754
755         LASSERT(!(oa->o_valid & bits));
756
757         oa->o_valid |= bits;
758         client_obd_list_lock(&cli->cl_loi_list_lock);
759         oa->o_dirty = cli->cl_dirty;
760         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
763                 oa->o_undirty = 0;
764         } else if (cfs_atomic_read(&obd_dirty_pages) -
765                    cfs_atomic_read(&obd_dirty_transit_pages) >
766                    obd_max_dirty_pages + 1){
767                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768                  * not covered by a lock thus they may safely race and trip
769                  * this CERROR() unless we add in a small fudge factor (+1). */
770                 CERROR("dirty %d - %d > system dirty_max %d\n",
771                        cfs_atomic_read(&obd_dirty_pages),
772                        cfs_atomic_read(&obd_dirty_transit_pages),
773                        obd_max_dirty_pages);
774                 oa->o_undirty = 0;
775         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776                 CERROR("dirty %lu - dirty_max %lu too big???\n",
777                        cli->cl_dirty, cli->cl_dirty_max);
778                 oa->o_undirty = 0;
779         } else {
780                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781                                 (cli->cl_max_rpcs_in_flight + 1);
782                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
783         }
784         oa->o_grant = cli->cl_avail_grant;
785         oa->o_dropped = cli->cl_lost_grant;
786         cli->cl_lost_grant = 0;
787         client_obd_list_unlock(&cli->cl_loi_list_lock);
788         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
790
791 }
792
793 static void osc_update_next_shrink(struct client_obd *cli)
794 {
795         cli->cl_next_shrink_grant =
796                 cfs_time_shift(cli->cl_grant_shrink_interval);
797         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798                cli->cl_next_shrink_grant);
799 }
800
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803                                     struct brw_page *pga)
804 {
805         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807         cfs_atomic_inc(&obd_dirty_pages);
808         cli->cl_dirty += CFS_PAGE_SIZE;
809         cli->cl_avail_grant -= CFS_PAGE_SIZE;
810         pga->flag |= OBD_BRW_FROM_GRANT;
811         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812                CFS_PAGE_SIZE, pga, pga->pg);
813         LASSERT(cli->cl_avail_grant >= 0);
814         osc_update_next_shrink(cli);
815 }
816
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818  * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820                                     struct brw_page *pga, int sent)
821 {
822         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
823         ENTRY;
824
825         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
827                 EXIT;
828                 return;
829         }
830
831         pga->flag &= ~OBD_BRW_FROM_GRANT;
832         cfs_atomic_dec(&obd_dirty_pages);
833         cli->cl_dirty -= CFS_PAGE_SIZE;
834         if (pga->flag & OBD_BRW_NOCACHE) {
835                 pga->flag &= ~OBD_BRW_NOCACHE;
836                 cfs_atomic_dec(&obd_dirty_transit_pages);
837                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
838         }
839         if (!sent) {
840                 cli->cl_lost_grant += CFS_PAGE_SIZE;
841                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844                 /* For short writes we shouldn't count parts of pages that
845                  * span a whole block on the OST side, or our accounting goes
846                  * wrong.  Should match the code in filter_grant_check. */
847                 int offset = pga->off & ~CFS_PAGE_MASK;
848                 int count = pga->count + (offset & (blocksize - 1));
849                 int end = (offset + pga->count) & (blocksize - 1);
850                 if (end)
851                         count += blocksize - end;
852
853                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856                        cli->cl_avail_grant, cli->cl_dirty);
857         }
858
859         EXIT;
860 }
861
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
863 {
864         return cli->cl_r_in_flight + cli->cl_w_in_flight;
865 }
866
867 int osc_wake_sync_fs(struct client_obd *cli)
868 {
869         ENTRY;
870         if (cfs_list_empty(&cli->cl_loi_sync_fs_list) &&
871             cli->cl_sf_wait.started) {
872                 cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, 0);
873                 cli->cl_sf_wait.started = 0;
874         }
875         RETURN(0);
876 }
877
878 /* caller must hold loi_list_lock */
879 void osc_wake_cache_waiters(struct client_obd *cli)
880 {
881         cfs_list_t *l, *tmp;
882         struct osc_cache_waiter *ocw;
883
884         ENTRY;
885         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
886                 /* if we can't dirty more, we must wait until some is written */
887                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
888                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
889                     obd_max_dirty_pages)) {
890                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
891                                "osc max %ld, sys max %d\n", cli->cl_dirty,
892                                cli->cl_dirty_max, obd_max_dirty_pages);
893                         return;
894                 }
895
896                 /* if still dirty cache but no grant wait for pending RPCs that
897                  * may yet return us some grant before doing sync writes */
898                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
899                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
900                                cli->cl_w_in_flight);
901                         return;
902                 }
903
904                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
905                 cfs_list_del_init(&ocw->ocw_entry);
906                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
907                         /* no more RPCs in flight to return grant, do sync IO */
908                         ocw->ocw_rc = -EDQUOT;
909                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
910                 } else {
911                         osc_consume_write_grant(cli,
912                                                 &ocw->ocw_oap->oap_brw_page);
913                 }
914
915                 cfs_waitq_signal(&ocw->ocw_waitq);
916         }
917
918         EXIT;
919 }
920
921 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
922 {
923         client_obd_list_lock(&cli->cl_loi_list_lock);
924         cli->cl_avail_grant += grant;
925         client_obd_list_unlock(&cli->cl_loi_list_lock);
926 }
927
928 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
929 {
930         if (body->oa.o_valid & OBD_MD_FLGRANT) {
931                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
932                 __osc_update_grant(cli, body->oa.o_grant);
933         }
934 }
935
936 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
937                               void *key, obd_count vallen, void *val,
938                               struct ptlrpc_request_set *set);
939
940 static int osc_shrink_grant_interpret(const struct lu_env *env,
941                                       struct ptlrpc_request *req,
942                                       void *aa, int rc)
943 {
944         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
945         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
946         struct ost_body *body;
947
948         if (rc != 0) {
949                 __osc_update_grant(cli, oa->o_grant);
950                 GOTO(out, rc);
951         }
952
953         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
954         LASSERT(body);
955         osc_update_grant(cli, body);
956 out:
957         OBDO_FREE(oa);
958         return rc;
959 }
960
961 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
962 {
963         client_obd_list_lock(&cli->cl_loi_list_lock);
964         oa->o_grant = cli->cl_avail_grant / 4;
965         cli->cl_avail_grant -= oa->o_grant;
966         client_obd_list_unlock(&cli->cl_loi_list_lock);
967         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
968                 oa->o_valid |= OBD_MD_FLFLAGS;
969                 oa->o_flags = 0;
970         }
971         oa->o_flags |= OBD_FL_SHRINK_GRANT;
972         osc_update_next_shrink(cli);
973 }
974
975 /* Shrink the current grant, either from some large amount to enough for a
976  * full set of in-flight RPCs, or if we have already shrunk to that limit
977  * then to enough for a single RPC.  This avoids keeping more grant than
978  * needed, and avoids shrinking the grant piecemeal. */
979 static int osc_shrink_grant(struct client_obd *cli)
980 {
981         long target = (cli->cl_max_rpcs_in_flight + 1) *
982                       cli->cl_max_pages_per_rpc;
983
984         client_obd_list_lock(&cli->cl_loi_list_lock);
985         if (cli->cl_avail_grant <= target)
986                 target = cli->cl_max_pages_per_rpc;
987         client_obd_list_unlock(&cli->cl_loi_list_lock);
988
989         return osc_shrink_grant_to_target(cli, target);
990 }
991
992 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
993 {
994         int    rc = 0;
995         struct ost_body     *body;
996         ENTRY;
997
998         client_obd_list_lock(&cli->cl_loi_list_lock);
999         /* Don't shrink if we are already above or below the desired limit
1000          * We don't want to shrink below a single RPC, as that will negatively
1001          * impact block allocation and long-term performance. */
1002         if (target < cli->cl_max_pages_per_rpc)
1003                 target = cli->cl_max_pages_per_rpc;
1004
1005         if (target >= cli->cl_avail_grant) {
1006                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1007                 RETURN(0);
1008         }
1009         client_obd_list_unlock(&cli->cl_loi_list_lock);
1010
1011         OBD_ALLOC_PTR(body);
1012         if (!body)
1013                 RETURN(-ENOMEM);
1014
1015         osc_announce_cached(cli, &body->oa, 0);
1016
1017         client_obd_list_lock(&cli->cl_loi_list_lock);
1018         body->oa.o_grant = cli->cl_avail_grant - target;
1019         cli->cl_avail_grant = target;
1020         client_obd_list_unlock(&cli->cl_loi_list_lock);
1021         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1022                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1023                 body->oa.o_flags = 0;
1024         }
1025         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1026         osc_update_next_shrink(cli);
1027
1028         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1029                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1030                                 sizeof(*body), body, NULL);
1031         if (rc != 0)
1032                 __osc_update_grant(cli, body->oa.o_grant);
1033         OBD_FREE_PTR(body);
1034         RETURN(rc);
1035 }
1036
1037 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1038 static int osc_should_shrink_grant(struct client_obd *client)
1039 {
1040         cfs_time_t time = cfs_time_current();
1041         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1042
1043         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1044              OBD_CONNECT_GRANT_SHRINK) == 0)
1045                 return 0;
1046
1047         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1048                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1049                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1050                         return 1;
1051                 else
1052                         osc_update_next_shrink(client);
1053         }
1054         return 0;
1055 }
1056
1057 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1058 {
1059         struct client_obd *client;
1060
1061         cfs_list_for_each_entry(client, &item->ti_obd_list,
1062                                 cl_grant_shrink_list) {
1063                 if (osc_should_shrink_grant(client))
1064                         osc_shrink_grant(client);
1065         }
1066         return 0;
1067 }
1068
1069 static int osc_add_shrink_grant(struct client_obd *client)
1070 {
1071         int rc;
1072
1073         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1074                                        TIMEOUT_GRANT,
1075                                        osc_grant_shrink_grant_cb, NULL,
1076                                        &client->cl_grant_shrink_list);
1077         if (rc) {
1078                 CERROR("add grant client %s error %d\n",
1079                         client->cl_import->imp_obd->obd_name, rc);
1080                 return rc;
1081         }
1082         CDEBUG(D_CACHE, "add grant client %s \n",
1083                client->cl_import->imp_obd->obd_name);
1084         osc_update_next_shrink(client);
1085         return 0;
1086 }
1087
1088 static int osc_del_shrink_grant(struct client_obd *client)
1089 {
1090         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1091                                          TIMEOUT_GRANT);
1092 }
1093
1094 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1095 {
1096         /*
1097          * ocd_grant is the total grant amount we're expect to hold: if we've
1098          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1099          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1100          *
1101          * race is tolerable here: if we're evicted, but imp_state already
1102          * left EVICTED state, then cl_dirty must be 0 already.
1103          */
1104         client_obd_list_lock(&cli->cl_loi_list_lock);
1105         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1106                 cli->cl_avail_grant = ocd->ocd_grant;
1107         else
1108                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1109
1110         if (cli->cl_avail_grant < 0) {
1111                 CWARN("%s: available grant < 0, the OSS is probably not running"
1112                       " with patch from bug20278 (%ld) \n",
1113                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1114                 /* workaround for 1.6 servers which do not have 
1115                  * the patch from bug20278 */
1116                 cli->cl_avail_grant = ocd->ocd_grant;
1117         }
1118
1119         client_obd_list_unlock(&cli->cl_loi_list_lock);
1120
1121         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1122                cli->cl_import->imp_obd->obd_name,
1123                cli->cl_avail_grant, cli->cl_lost_grant);
1124
1125         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1126             cfs_list_empty(&cli->cl_grant_shrink_list))
1127                 osc_add_shrink_grant(cli);
1128 }
1129
1130 /* We assume that the reason this OSC got a short read is because it read
1131  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1132  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1133  * this stripe never got written at or beyond this stripe offset yet. */
1134 static void handle_short_read(int nob_read, obd_count page_count,
1135                               struct brw_page **pga)
1136 {
1137         char *ptr;
1138         int i = 0;
1139
1140         /* skip bytes read OK */
1141         while (nob_read > 0) {
1142                 LASSERT (page_count > 0);
1143
1144                 if (pga[i]->count > nob_read) {
1145                         /* EOF inside this page */
1146                         ptr = cfs_kmap(pga[i]->pg) +
1147                                 (pga[i]->off & ~CFS_PAGE_MASK);
1148                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1149                         cfs_kunmap(pga[i]->pg);
1150                         page_count--;
1151                         i++;
1152                         break;
1153                 }
1154
1155                 nob_read -= pga[i]->count;
1156                 page_count--;
1157                 i++;
1158         }
1159
1160         /* zero remaining pages */
1161         while (page_count-- > 0) {
1162                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1163                 memset(ptr, 0, pga[i]->count);
1164                 cfs_kunmap(pga[i]->pg);
1165                 i++;
1166         }
1167 }
1168
1169 static int check_write_rcs(struct ptlrpc_request *req,
1170                            int requested_nob, int niocount,
1171                            obd_count page_count, struct brw_page **pga)
1172 {
1173         int     i;
1174         __u32   *remote_rcs;
1175
1176         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1177                                                   sizeof(*remote_rcs) *
1178                                                   niocount);
1179         if (remote_rcs == NULL) {
1180                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1181                 return(-EPROTO);
1182         }
1183
1184         /* return error if any niobuf was in error */
1185         for (i = 0; i < niocount; i++) {
1186                 if (remote_rcs[i] < 0)
1187                         return(remote_rcs[i]);
1188
1189                 if (remote_rcs[i] != 0) {
1190                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1191                                 i, remote_rcs[i], req);
1192                         return(-EPROTO);
1193                 }
1194         }
1195
1196         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1197                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1198                        req->rq_bulk->bd_nob_transferred, requested_nob);
1199                 return(-EPROTO);
1200         }
1201
1202         return (0);
1203 }
1204
1205 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1206 {
1207         if (p1->flag != p2->flag) {
1208                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1209                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1210
1211                 /* warn if we try to combine flags that we don't know to be
1212                  * safe to combine */
1213                 if ((p1->flag & mask) != (p2->flag & mask))
1214                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1215                                "same brw?\n", p1->flag, p2->flag);
1216                 return 0;
1217         }
1218
1219         return (p1->off + p1->count == p2->off);
1220 }
1221
1222 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1223                                    struct brw_page **pga, int opc,
1224                                    cksum_type_t cksum_type)
1225 {
1226         __u32 cksum;
1227         int i = 0;
1228
1229         LASSERT (pg_count > 0);
1230         cksum = init_checksum(cksum_type);
1231         while (nob > 0 && pg_count > 0) {
1232                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1233                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1234                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1235
1236                 /* corrupt the data before we compute the checksum, to
1237                  * simulate an OST->client data error */
1238                 if (i == 0 && opc == OST_READ &&
1239                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1240                         memcpy(ptr + off, "bad1", min(4, nob));
1241                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1242                 cfs_kunmap(pga[i]->pg);
1243                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1244                                off, cksum);
1245
1246                 nob -= pga[i]->count;
1247                 pg_count--;
1248                 i++;
1249         }
1250         /* For sending we only compute the wrong checksum instead
1251          * of corrupting the data so it is still correct on a redo */
1252         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1253                 cksum++;
1254
1255         return cksum;
1256 }
1257
1258 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1259                                 struct lov_stripe_md *lsm, obd_count page_count,
1260                                 struct brw_page **pga,
1261                                 struct ptlrpc_request **reqp,
1262                                 struct obd_capa *ocapa, int reserve)
1263 {
1264         struct ptlrpc_request   *req;
1265         struct ptlrpc_bulk_desc *desc;
1266         struct ost_body         *body;
1267         struct obd_ioobj        *ioobj;
1268         struct niobuf_remote    *niobuf;
1269         int niocount, i, requested_nob, opc, rc;
1270         struct osc_brw_async_args *aa;
1271         struct req_capsule      *pill;
1272         struct brw_page *pg_prev;
1273
1274         ENTRY;
1275         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1276                 RETURN(-ENOMEM); /* Recoverable */
1277         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1278                 RETURN(-EINVAL); /* Fatal */
1279
1280         if ((cmd & OBD_BRW_WRITE) != 0) {
1281                 opc = OST_WRITE;
1282                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1283                                                 cli->cl_import->imp_rq_pool,
1284                                                 &RQF_OST_BRW_WRITE);
1285         } else {
1286                 opc = OST_READ;
1287                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1288         }
1289         if (req == NULL)
1290                 RETURN(-ENOMEM);
1291
1292         for (niocount = i = 1; i < page_count; i++) {
1293                 if (!can_merge_pages(pga[i - 1], pga[i]))
1294                         niocount++;
1295         }
1296
1297         pill = &req->rq_pill;
1298         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1299                              sizeof(*ioobj));
1300         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1301                              niocount * sizeof(*niobuf));
1302         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1303
1304         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1305         if (rc) {
1306                 ptlrpc_request_free(req);
1307                 RETURN(rc);
1308         }
1309         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1310         ptlrpc_at_set_req_timeout(req);
1311
1312         if (opc == OST_WRITE)
1313                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1314                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1315         else
1316                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1317                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1318
1319         if (desc == NULL)
1320                 GOTO(out, rc = -ENOMEM);
1321         /* NB request now owns desc and will free it when it gets freed */
1322
1323         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1324         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1325         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1326         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1327
1328         lustre_set_wire_obdo(&body->oa, oa);
1329
1330         obdo_to_ioobj(oa, ioobj);
1331         ioobj->ioo_bufcnt = niocount;
1332         osc_pack_capa(req, body, ocapa);
1333         LASSERT (page_count > 0);
1334         pg_prev = pga[0];
1335         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1336                 struct brw_page *pg = pga[i];
1337
1338                 LASSERT(pg->count > 0);
1339                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1340                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1341                          pg->off, pg->count);
1342 #ifdef __linux__
1343                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1344                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1345                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1346                          i, page_count,
1347                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1348                          pg_prev->pg, page_private(pg_prev->pg),
1349                          pg_prev->pg->index, pg_prev->off);
1350 #else
1351                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1352                          "i %d p_c %u\n", i, page_count);
1353 #endif
1354                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1355                         (pg->flag & OBD_BRW_SRVLOCK));
1356
1357                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1358                                       pg->count);
1359                 requested_nob += pg->count;
1360
1361                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1362                         niobuf--;
1363                         niobuf->len += pg->count;
1364                 } else {
1365                         niobuf->offset = pg->off;
1366                         niobuf->len    = pg->count;
1367                         niobuf->flags  = pg->flag;
1368                 }
1369                 pg_prev = pg;
1370         }
1371
1372         LASSERTF((void *)(niobuf - niocount) ==
1373                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1374                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1375                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1376
1377         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1378         if (osc_should_shrink_grant(cli))
1379                 osc_shrink_grant_local(cli, &body->oa);
1380
1381         /* size[REQ_REC_OFF] still sizeof (*body) */
1382         if (opc == OST_WRITE) {
1383                 if (unlikely(cli->cl_checksum) &&
1384                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1385                         /* store cl_cksum_type in a local variable since
1386                          * it can be changed via lprocfs */
1387                         cksum_type_t cksum_type = cli->cl_cksum_type;
1388
1389                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1390                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1391                                 body->oa.o_flags = 0;
1392                         }
1393                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1394                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1395                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1396                                                              page_count, pga,
1397                                                              OST_WRITE,
1398                                                              cksum_type);
1399                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1400                                body->oa.o_cksum);
1401                         /* save this in 'oa', too, for later checking */
1402                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403                         oa->o_flags |= cksum_type_pack(cksum_type);
1404                 } else {
1405                         /* clear out the checksum flag, in case this is a
1406                          * resend but cl_checksum is no longer set. b=11238 */
1407                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1408                 }
1409                 oa->o_cksum = body->oa.o_cksum;
1410                 /* 1 RC per niobuf */
1411                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1412                                      sizeof(__u32) * niocount);
1413         } else {
1414                 if (unlikely(cli->cl_checksum) &&
1415                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1416                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1417                                 body->oa.o_flags = 0;
1418                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1419                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1420                 }
1421         }
1422         ptlrpc_request_set_replen(req);
1423
1424         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1425         aa = ptlrpc_req_async_args(req);
1426         aa->aa_oa = oa;
1427         aa->aa_requested_nob = requested_nob;
1428         aa->aa_nio_count = niocount;
1429         aa->aa_page_count = page_count;
1430         aa->aa_resends = 0;
1431         aa->aa_ppga = pga;
1432         aa->aa_cli = cli;
1433         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1434         if (ocapa && reserve)
1435                 aa->aa_ocapa = capa_get(ocapa);
1436
1437         *reqp = req;
1438         RETURN(0);
1439
1440  out:
1441         ptlrpc_req_finished(req);
1442         RETURN(rc);
1443 }
1444
1445 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1446                                 __u32 client_cksum, __u32 server_cksum, int nob,
1447                                 obd_count page_count, struct brw_page **pga,
1448                                 cksum_type_t client_cksum_type)
1449 {
1450         __u32 new_cksum;
1451         char *msg;
1452         cksum_type_t cksum_type;
1453
1454         if (server_cksum == client_cksum) {
1455                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1456                 return 0;
1457         }
1458
1459         /* If this is mmaped file - it can be changed at any time */
1460         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1461                 return 1;
1462
1463         if (oa->o_valid & OBD_MD_FLFLAGS)
1464                 cksum_type = cksum_type_unpack(oa->o_flags);
1465         else
1466                 cksum_type = OBD_CKSUM_CRC32;
1467
1468         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1469                                       cksum_type);
1470
1471         if (cksum_type != client_cksum_type)
1472                 msg = "the server did not use the checksum type specified in "
1473                       "the original request - likely a protocol problem";
1474         else if (new_cksum == server_cksum)
1475                 msg = "changed on the client after we checksummed it - "
1476                       "likely false positive due to mmap IO (bug 11742)";
1477         else if (new_cksum == client_cksum)
1478                 msg = "changed in transit before arrival at OST";
1479         else
1480                 msg = "changed in transit AND doesn't match the original - "
1481                       "likely false positive due to mmap IO (bug 11742)";
1482
1483         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1484                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1485                            msg, libcfs_nid2str(peer->nid),
1486                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1487                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1488                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1489                            oa->o_id,
1490                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1491                            pga[0]->off,
1492                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1493         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1494                "client csum now %x\n", client_cksum, client_cksum_type,
1495                server_cksum, cksum_type, new_cksum);
1496         return 1;
1497 }
1498
1499 /* Note rc enters this function as number of bytes transferred */
1500 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1501 {
1502         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1503         const lnet_process_id_t *peer =
1504                         &req->rq_import->imp_connection->c_peer;
1505         struct client_obd *cli = aa->aa_cli;
1506         struct ost_body *body;
1507         __u32 client_cksum = 0;
1508         ENTRY;
1509
1510         if (rc < 0 && rc != -EDQUOT) {
1511                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1512                 RETURN(rc);
1513         }
1514
1515         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1516         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1517         if (body == NULL) {
1518                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1519                 RETURN(-EPROTO);
1520         }
1521
1522 #ifdef HAVE_QUOTA_SUPPORT
1523         /* set/clear over quota flag for a uid/gid */
1524         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1525             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1526                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1527
1528                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1529                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1530                        body->oa.o_flags);
1531                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1532                              body->oa.o_flags);
1533         }
1534 #endif
1535
1536         osc_update_grant(cli, body);
1537
1538         if (rc < 0)
1539                 RETURN(rc);
1540
1541         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1542                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1543
1544         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1545                 if (rc > 0) {
1546                         CERROR("Unexpected +ve rc %d\n", rc);
1547                         RETURN(-EPROTO);
1548                 }
1549                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1550
1551                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1552                         RETURN(-EAGAIN);
1553
1554                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1555                     check_write_checksum(&body->oa, peer, client_cksum,
1556                                          body->oa.o_cksum, aa->aa_requested_nob,
1557                                          aa->aa_page_count, aa->aa_ppga,
1558                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1559                         RETURN(-EAGAIN);
1560
1561                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1562                                      aa->aa_page_count, aa->aa_ppga);
1563                 GOTO(out, rc);
1564         }
1565
1566         /* The rest of this function executes only for OST_READs */
1567
1568         /* if unwrap_bulk failed, return -EAGAIN to retry */
1569         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1570         if (rc < 0)
1571                 GOTO(out, rc = -EAGAIN);
1572
1573         if (rc > aa->aa_requested_nob) {
1574                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1575                        aa->aa_requested_nob);
1576                 RETURN(-EPROTO);
1577         }
1578
1579         if (rc != req->rq_bulk->bd_nob_transferred) {
1580                 CERROR ("Unexpected rc %d (%d transferred)\n",
1581                         rc, req->rq_bulk->bd_nob_transferred);
1582                 return (-EPROTO);
1583         }
1584
1585         if (rc < aa->aa_requested_nob)
1586                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1587
1588         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1589                 static int cksum_counter;
1590                 __u32      server_cksum = body->oa.o_cksum;
1591                 char      *via;
1592                 char      *router;
1593                 cksum_type_t cksum_type;
1594
1595                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1596                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1597                 else
1598                         cksum_type = OBD_CKSUM_CRC32;
1599                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1600                                                  aa->aa_ppga, OST_READ,
1601                                                  cksum_type);
1602
1603                 if (peer->nid == req->rq_bulk->bd_sender) {
1604                         via = router = "";
1605                 } else {
1606                         via = " via ";
1607                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1608                 }
1609
1610                 if (server_cksum == ~0 && rc > 0) {
1611                         CERROR("Protocol error: server %s set the 'checksum' "
1612                                "bit, but didn't send a checksum.  Not fatal, "
1613                                "but please notify on http://bugzilla.lustre.org/\n",
1614                                libcfs_nid2str(peer->nid));
1615                 } else if (server_cksum != client_cksum) {
1616                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1617                                            "%s%s%s inode "DFID" object "
1618                                            LPU64"/"LPU64" extent "
1619                                            "["LPU64"-"LPU64"]\n",
1620                                            req->rq_import->imp_obd->obd_name,
1621                                            libcfs_nid2str(peer->nid),
1622                                            via, router,
1623                                            body->oa.o_valid & OBD_MD_FLFID ?
1624                                                 body->oa.o_parent_seq : (__u64)0,
1625                                            body->oa.o_valid & OBD_MD_FLFID ?
1626                                                 body->oa.o_parent_oid : 0,
1627                                            body->oa.o_valid & OBD_MD_FLFID ?
1628                                                 body->oa.o_parent_ver : 0,
1629                                            body->oa.o_id,
1630                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1631                                                 body->oa.o_seq : (__u64)0,
1632                                            aa->aa_ppga[0]->off,
1633                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1634                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1635                                                                         1);
1636                         CERROR("client %x, server %x, cksum_type %x\n",
1637                                client_cksum, server_cksum, cksum_type);
1638                         cksum_counter = 0;
1639                         aa->aa_oa->o_cksum = client_cksum;
1640                         rc = -EAGAIN;
1641                 } else {
1642                         cksum_counter++;
1643                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1644                         rc = 0;
1645                 }
1646         } else if (unlikely(client_cksum)) {
1647                 static int cksum_missed;
1648
1649                 cksum_missed++;
1650                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1651                         CERROR("Checksum %u requested from %s but not sent\n",
1652                                cksum_missed, libcfs_nid2str(peer->nid));
1653         } else {
1654                 rc = 0;
1655         }
1656 out:
1657         if (rc >= 0)
1658                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1659
1660         RETURN(rc);
1661 }
1662
1663 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1664                             struct lov_stripe_md *lsm,
1665                             obd_count page_count, struct brw_page **pga,
1666                             struct obd_capa *ocapa)
1667 {
1668         struct ptlrpc_request *req;
1669         int                    rc;
1670         cfs_waitq_t            waitq;
1671         int                    resends = 0;
1672         struct l_wait_info     lwi;
1673
1674         ENTRY;
1675
1676         cfs_waitq_init(&waitq);
1677
1678 restart_bulk:
1679         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1680                                   page_count, pga, &req, ocapa, 0);
1681         if (rc != 0)
1682                 return (rc);
1683
1684         rc = ptlrpc_queue_wait(req);
1685
1686         if (rc == -ETIMEDOUT && req->rq_resend) {
1687                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1688                 ptlrpc_req_finished(req);
1689                 goto restart_bulk;
1690         }
1691
1692         rc = osc_brw_fini_request(req, rc);
1693
1694         ptlrpc_req_finished(req);
1695         if (osc_recoverable_error(rc)) {
1696                 resends++;
1697                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1698                         CERROR("too many resend retries, returning error\n");
1699                         RETURN(-EIO);
1700                 }
1701
1702                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1703                 l_wait_event(waitq, 0, &lwi);
1704
1705                 goto restart_bulk;
1706         }
1707
1708         RETURN (rc);
1709 }
1710
1711 int osc_brw_redo_request(struct ptlrpc_request *request,
1712                          struct osc_brw_async_args *aa)
1713 {
1714         struct ptlrpc_request *new_req;
1715         struct ptlrpc_request_set *set = request->rq_set;
1716         struct osc_brw_async_args *new_aa;
1717         struct osc_async_page *oap;
1718         int rc = 0;
1719         ENTRY;
1720
1721         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1722                 CERROR("too many resent retries, returning error\n");
1723                 RETURN(-EIO);
1724         }
1725
1726         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1727
1728         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1729                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1730                                   aa->aa_cli, aa->aa_oa,
1731                                   NULL /* lsm unused by osc currently */,
1732                                   aa->aa_page_count, aa->aa_ppga,
1733                                   &new_req, aa->aa_ocapa, 0);
1734         if (rc)
1735                 RETURN(rc);
1736
1737         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1738
1739         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1740                 if (oap->oap_request != NULL) {
1741                         LASSERTF(request == oap->oap_request,
1742                                  "request %p != oap_request %p\n",
1743                                  request, oap->oap_request);
1744                         if (oap->oap_interrupted) {
1745                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1746                                 ptlrpc_req_finished(new_req);
1747                                 RETURN(-EINTR);
1748                         }
1749                 }
1750         }
1751         /* New request takes over pga and oaps from old request.
1752          * Note that copying a list_head doesn't work, need to move it... */
1753         aa->aa_resends++;
1754         new_req->rq_interpret_reply = request->rq_interpret_reply;
1755         new_req->rq_async_args = request->rq_async_args;
1756         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1757
1758         new_aa = ptlrpc_req_async_args(new_req);
1759
1760         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1761         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1762         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1763
1764         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1765                 if (oap->oap_request) {
1766                         ptlrpc_req_finished(oap->oap_request);
1767                         oap->oap_request = ptlrpc_request_addref(new_req);
1768                 }
1769         }
1770
1771         new_aa->aa_ocapa = aa->aa_ocapa;
1772         aa->aa_ocapa = NULL;
1773
1774         /* use ptlrpc_set_add_req is safe because interpret functions work
1775          * in check_set context. only one way exist with access to request
1776          * from different thread got -EINTR - this way protected with
1777          * cl_loi_list_lock */
1778         ptlrpc_set_add_req(set, new_req);
1779
1780         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1781
1782         DEBUG_REQ(D_INFO, new_req, "new request");
1783         RETURN(0);
1784 }
1785
1786 /*
1787  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1788  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1789  * fine for our small page arrays and doesn't require allocation.  its an
1790  * insertion sort that swaps elements that are strides apart, shrinking the
1791  * stride down until its '1' and the array is sorted.
1792  */
1793 static void sort_brw_pages(struct brw_page **array, int num)
1794 {
1795         int stride, i, j;
1796         struct brw_page *tmp;
1797
1798         if (num == 1)
1799                 return;
1800         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1801                 ;
1802
1803         do {
1804                 stride /= 3;
1805                 for (i = stride ; i < num ; i++) {
1806                         tmp = array[i];
1807                         j = i;
1808                         while (j >= stride && array[j - stride]->off > tmp->off) {
1809                                 array[j] = array[j - stride];
1810                                 j -= stride;
1811                         }
1812                         array[j] = tmp;
1813                 }
1814         } while (stride > 1);
1815 }
1816
1817 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1818 {
1819         int count = 1;
1820         int offset;
1821         int i = 0;
1822
1823         LASSERT (pages > 0);
1824         offset = pg[i]->off & ~CFS_PAGE_MASK;
1825
1826         for (;;) {
1827                 pages--;
1828                 if (pages == 0)         /* that's all */
1829                         return count;
1830
1831                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1832                         return count;   /* doesn't end on page boundary */
1833
1834                 i++;
1835                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1836                 if (offset != 0)        /* doesn't start on page boundary */
1837                         return count;
1838
1839                 count++;
1840         }
1841 }
1842
1843 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1844 {
1845         struct brw_page **ppga;
1846         int i;
1847
1848         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1849         if (ppga == NULL)
1850                 return NULL;
1851
1852         for (i = 0; i < count; i++)
1853                 ppga[i] = pga + i;
1854         return ppga;
1855 }
1856
1857 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1858 {
1859         LASSERT(ppga != NULL);
1860         OBD_FREE(ppga, sizeof(*ppga) * count);
1861 }
1862
1863 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1864                    obd_count page_count, struct brw_page *pga,
1865                    struct obd_trans_info *oti)
1866 {
1867         struct obdo *saved_oa = NULL;
1868         struct brw_page **ppga, **orig;
1869         struct obd_import *imp = class_exp2cliimp(exp);
1870         struct client_obd *cli;
1871         int rc, page_count_orig;
1872         ENTRY;
1873
1874         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1875         cli = &imp->imp_obd->u.cli;
1876
1877         if (cmd & OBD_BRW_CHECK) {
1878                 /* The caller just wants to know if there's a chance that this
1879                  * I/O can succeed */
1880
1881                 if (imp->imp_invalid)
1882                         RETURN(-EIO);
1883                 RETURN(0);
1884         }
1885
1886         /* test_brw with a failed create can trip this, maybe others. */
1887         LASSERT(cli->cl_max_pages_per_rpc);
1888
1889         rc = 0;
1890
1891         orig = ppga = osc_build_ppga(pga, page_count);
1892         if (ppga == NULL)
1893                 RETURN(-ENOMEM);
1894         page_count_orig = page_count;
1895
1896         sort_brw_pages(ppga, page_count);
1897         while (page_count) {
1898                 obd_count pages_per_brw;
1899
1900                 if (page_count > cli->cl_max_pages_per_rpc)
1901                         pages_per_brw = cli->cl_max_pages_per_rpc;
1902                 else
1903                         pages_per_brw = page_count;
1904
1905                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1906
1907                 if (saved_oa != NULL) {
1908                         /* restore previously saved oa */
1909                         *oinfo->oi_oa = *saved_oa;
1910                 } else if (page_count > pages_per_brw) {
1911                         /* save a copy of oa (brw will clobber it) */
1912                         OBDO_ALLOC(saved_oa);
1913                         if (saved_oa == NULL)
1914                                 GOTO(out, rc = -ENOMEM);
1915                         *saved_oa = *oinfo->oi_oa;
1916                 }
1917
1918                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1919                                       pages_per_brw, ppga, oinfo->oi_capa);
1920
1921                 if (rc != 0)
1922                         break;
1923
1924                 page_count -= pages_per_brw;
1925                 ppga += pages_per_brw;
1926         }
1927
1928 out:
1929         osc_release_ppga(orig, page_count_orig);
1930
1931         if (saved_oa != NULL)
1932                 OBDO_FREE(saved_oa);
1933
1934         RETURN(rc);
1935 }
1936
1937 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1938  * the dirty accounting.  Writeback completes or truncate happens before
1939  * writing starts.  Must be called with the loi lock held. */
1940 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1941                            int sent)
1942 {
1943         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1944 }
1945
1946 static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop)
1947 {
1948         struct osc_async_page *oap;
1949         ENTRY;
1950
1951         if (cfs_list_empty(&lop->lop_urgent))
1952                 RETURN(0);
1953
1954         oap = cfs_list_entry(lop->lop_urgent.next,
1955                              struct osc_async_page, oap_urgent_item);
1956
1957         if (oap->oap_async_flags & ASYNC_SYNCFS) {
1958                 CDEBUG(D_CACHE, "syncfs request forcing RPC\n");
1959                 RETURN(1);
1960         }
1961
1962         RETURN(0);
1963 }
1964
1965 /* This maintains the lists of pending pages to read/write for a given object
1966  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1967  * to quickly find objects that are ready to send an RPC. */
1968 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1969                          int cmd)
1970 {
1971         int optimal;
1972         ENTRY;
1973
1974         if (lop->lop_num_pending == 0)
1975                 RETURN(0);
1976
1977         /* if we have an invalid import we want to drain the queued pages
1978          * by forcing them through rpcs that immediately fail and complete
1979          * the pages.  recovery relies on this to empty the queued pages
1980          * before canceling the locks and evicting down the llite pages */
1981         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1982                 RETURN(1);
1983
1984         /* stream rpcs in queue order as long as as there is an urgent page
1985          * queued.  this is our cheap solution for good batching in the case
1986          * where writepage marks some random page in the middle of the file
1987          * as urgent because of, say, memory pressure */
1988         if (!cfs_list_empty(&lop->lop_urgent)) {
1989                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1990                 RETURN(1);
1991         }
1992         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1993         optimal = cli->cl_max_pages_per_rpc;
1994         if (cmd & OBD_BRW_WRITE) {
1995                 /* trigger a write rpc stream as long as there are dirtiers
1996                  * waiting for space.  as they're waiting, they're not going to
1997                  * create more pages to coalesce with what's waiting.. */
1998                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1999                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2000                         RETURN(1);
2001                 }
2002                 /* +16 to avoid triggering rpcs that would want to include pages
2003                  * that are being queued but which can't be made ready until
2004                  * the queuer finishes with the page. this is a wart for
2005                  * llite::commit_write() */
2006                 optimal += 16;
2007         }
2008         if (lop->lop_num_pending >= optimal)
2009                 RETURN(1);
2010
2011         RETURN(0);
2012 }
2013
2014 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2015 {
2016         struct osc_async_page *oap;
2017         ENTRY;
2018
2019         if (cfs_list_empty(&lop->lop_urgent))
2020                 RETURN(0);
2021
2022         oap = cfs_list_entry(lop->lop_urgent.next,
2023                          struct osc_async_page, oap_urgent_item);
2024
2025         if (oap->oap_async_flags & ASYNC_HP) {
2026                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2027                 RETURN(1);
2028         }
2029
2030         RETURN(0);
2031 }
2032
2033 static void on_list(cfs_list_t *item, cfs_list_t *list,
2034                     int should_be_on)
2035 {
2036         if (cfs_list_empty(item) && should_be_on)
2037                 cfs_list_add_tail(item, list);
2038         else if (!cfs_list_empty(item) && !should_be_on)
2039                 cfs_list_del_init(item);
2040 }
2041
2042 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2043  * can find pages to build into rpcs quickly */
2044 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2045 {
2046         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2047             lop_makes_hprpc(&loi->loi_read_lop)) {
2048                 /* HP rpc */
2049                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2050                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2051         } else {
2052                 if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) {
2053                         on_list(&loi->loi_sync_fs_item,
2054                                 &cli->cl_loi_sync_fs_list,
2055                                 loi->loi_write_lop.lop_num_pending);
2056                 } else {
2057                         on_list(&loi->loi_hp_ready_item,
2058                                 &cli->cl_loi_hp_ready_list, 0);
2059                         on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2060                                 lop_makes_rpc(cli, &loi->loi_write_lop,
2061                                               OBD_BRW_WRITE)||
2062                                 lop_makes_rpc(cli, &loi->loi_read_lop,
2063                                               OBD_BRW_READ));
2064                 }
2065         }
2066
2067         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2068                 loi->loi_write_lop.lop_num_pending);
2069
2070         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2071                 loi->loi_read_lop.lop_num_pending);
2072 }
2073
2074 static void lop_update_pending(struct client_obd *cli,
2075                                struct loi_oap_pages *lop, int cmd, int delta)
2076 {
2077         lop->lop_num_pending += delta;
2078         if (cmd & OBD_BRW_WRITE)
2079                 cli->cl_pending_w_pages += delta;
2080         else
2081                 cli->cl_pending_r_pages += delta;
2082 }
2083
2084 /**
2085  * this is called when a sync waiter receives an interruption.  Its job is to
2086  * get the caller woken as soon as possible.  If its page hasn't been put in an
2087  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2088  * desiring interruption which will forcefully complete the rpc once the rpc
2089  * has timed out.
2090  */
2091 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2092 {
2093         struct loi_oap_pages *lop;
2094         struct lov_oinfo *loi;
2095         int rc = -EBUSY;
2096         ENTRY;
2097
2098         LASSERT(!oap->oap_interrupted);
2099         oap->oap_interrupted = 1;
2100
2101         /* ok, it's been put in an rpc. only one oap gets a request reference */
2102         if (oap->oap_request != NULL) {
2103                 ptlrpc_mark_interrupted(oap->oap_request);
2104                 ptlrpcd_wake(oap->oap_request);
2105                 ptlrpc_req_finished(oap->oap_request);
2106                 oap->oap_request = NULL;
2107         }
2108
2109         /*
2110          * page completion may be called only if ->cpo_prep() method was
2111          * executed by osc_io_submit(), that also adds page the to pending list
2112          */
2113         if (!cfs_list_empty(&oap->oap_pending_item)) {
2114                 cfs_list_del_init(&oap->oap_pending_item);
2115                 cfs_list_del_init(&oap->oap_urgent_item);
2116
2117                 loi = oap->oap_loi;
2118                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2119                         &loi->loi_write_lop : &loi->loi_read_lop;
2120                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2121                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2122                 rc = oap->oap_caller_ops->ap_completion(env,
2123                                           oap->oap_caller_data,
2124                                           oap->oap_cmd, NULL, -EINTR);
2125         }
2126
2127         RETURN(rc);
2128 }
2129
2130 /* this is trying to propogate async writeback errors back up to the
2131  * application.  As an async write fails we record the error code for later if
2132  * the app does an fsync.  As long as errors persist we force future rpcs to be
2133  * sync so that the app can get a sync error and break the cycle of queueing
2134  * pages for which writeback will fail. */
2135 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2136                            int rc)
2137 {
2138         if (rc) {
2139                 if (!ar->ar_rc)
2140                         ar->ar_rc = rc;
2141
2142                 ar->ar_force_sync = 1;
2143                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2144                 return;
2145
2146         }
2147
2148         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2149                 ar->ar_force_sync = 0;
2150 }
2151
2152 static int osc_add_to_lop_urgent(struct loi_oap_pages *lop,
2153                                  struct osc_async_page *oap,
2154                                  obd_flag async_flags)
2155 {
2156
2157         /* If true, then already present in lop urgent */
2158         if (!cfs_list_empty(&oap->oap_urgent_item)) {
2159                 CWARN("Request to add duplicate oap_urgent for flag = %d\n",
2160                        oap->oap_async_flags);
2161                 return 1;
2162         }
2163
2164         /* item from sync_fs, to avoid duplicates check the existing flags */
2165         if (async_flags & ASYNC_SYNCFS) {
2166                 cfs_list_add_tail(&oap->oap_urgent_item,
2167                                   &lop->lop_urgent);
2168                 return 0;
2169         }
2170
2171         if (oap->oap_async_flags & ASYNC_HP)
2172                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2173         else if (oap->oap_async_flags & ASYNC_URGENT ||
2174                  async_flags & ASYNC_URGENT)
2175                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2176
2177         return 0;
2178 }
2179
2180 void osc_oap_to_pending(struct osc_async_page *oap)
2181 {
2182         struct loi_oap_pages *lop;
2183
2184         if (oap->oap_cmd & OBD_BRW_WRITE)
2185                 lop = &oap->oap_loi->loi_write_lop;
2186         else
2187                 lop = &oap->oap_loi->loi_read_lop;
2188
2189         osc_add_to_lop_urgent(lop, oap, 0);
2190         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2191         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2192 }
2193
2194 /* this must be called holding the loi list lock to give coverage to exit_cache,
2195  * async_flag maintenance, and oap_request */
2196 static void osc_ap_completion(const struct lu_env *env,
2197                               struct client_obd *cli, struct obdo *oa,
2198                               struct osc_async_page *oap, int sent, int rc)
2199 {
2200         __u64 xid = 0;
2201
2202         ENTRY;
2203         if (oap->oap_request != NULL) {
2204                 xid = ptlrpc_req_xid(oap->oap_request);
2205                 ptlrpc_req_finished(oap->oap_request);
2206                 oap->oap_request = NULL;
2207         }
2208
2209         cfs_spin_lock(&oap->oap_lock);
2210         oap->oap_async_flags = 0;
2211         cfs_spin_unlock(&oap->oap_lock);
2212         oap->oap_interrupted = 0;
2213
2214         if (oap->oap_cmd & OBD_BRW_WRITE) {
2215                 osc_process_ar(&cli->cl_ar, xid, rc);
2216                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2217         }
2218
2219         if (rc == 0 && oa != NULL) {
2220                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2221                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2222                 if (oa->o_valid & OBD_MD_FLMTIME)
2223                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2224                 if (oa->o_valid & OBD_MD_FLATIME)
2225                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2226                 if (oa->o_valid & OBD_MD_FLCTIME)
2227                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2228         }
2229
2230         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2231                                                 oap->oap_cmd, oa, rc);
2232
2233         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2234          * I/O on the page could start, but OSC calls it under lock
2235          * and thus we can add oap back to pending safely */
2236         if (rc)
2237                 /* upper layer wants to leave the page on pending queue */
2238                 osc_oap_to_pending(oap);
2239         else
2240                 osc_exit_cache(cli, oap, sent);
2241         EXIT;
2242 }
2243
2244 static int brw_interpret(const struct lu_env *env,
2245                          struct ptlrpc_request *req, void *data, int rc)
2246 {
2247         struct osc_brw_async_args *aa = data;
2248         struct client_obd *cli;
2249         int async;
2250         ENTRY;
2251
2252         rc = osc_brw_fini_request(req, rc);
2253         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2254         if (osc_recoverable_error(rc)) {
2255                 /* Only retry once for mmaped files since the mmaped page
2256                  * might be modified at anytime. We have to retry at least
2257                  * once in case there WAS really a corruption of the page
2258                  * on the network, that was not caused by mmap() modifying
2259                  * the page. Bug11742 */
2260                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2261                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2262                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2263                         rc = 0;
2264                 } else {
2265                         rc = osc_brw_redo_request(req, aa);
2266                         if (rc == 0)
2267                                 RETURN(0);
2268                 }
2269         }
2270
2271         if (aa->aa_ocapa) {
2272                 capa_put(aa->aa_ocapa);
2273                 aa->aa_ocapa = NULL;
2274         }
2275
2276         cli = aa->aa_cli;
2277
2278         client_obd_list_lock(&cli->cl_loi_list_lock);
2279
2280         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2281          * is called so we know whether to go to sync BRWs or wait for more
2282          * RPCs to complete */
2283         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2284                 cli->cl_w_in_flight--;
2285         else
2286                 cli->cl_r_in_flight--;
2287
2288         async = cfs_list_empty(&aa->aa_oaps);
2289         if (!async) { /* from osc_send_oap_rpc() */
2290                 struct osc_async_page *oap, *tmp;
2291                 /* the caller may re-use the oap after the completion call so
2292                  * we need to clean it up a little */
2293                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2294                                              oap_rpc_item) {
2295                         cfs_list_del_init(&oap->oap_rpc_item);
2296                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2297                 }
2298                 OBDO_FREE(aa->aa_oa);
2299         } else { /* from async_internal() */
2300                 obd_count i;
2301                 for (i = 0; i < aa->aa_page_count; i++)
2302                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2303         }
2304         osc_wake_cache_waiters(cli);
2305         osc_wake_sync_fs(cli);
2306         osc_check_rpcs(env, cli);
2307         client_obd_list_unlock(&cli->cl_loi_list_lock);
2308         if (!async)
2309                 cl_req_completion(env, aa->aa_clerq, rc);
2310         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2311
2312         RETURN(rc);
2313 }
2314
2315 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2316                                             struct client_obd *cli,
2317                                             cfs_list_t *rpc_list,
2318                                             int page_count, int cmd)
2319 {
2320         struct ptlrpc_request *req;
2321         struct brw_page **pga = NULL;
2322         struct osc_brw_async_args *aa;
2323         struct obdo *oa = NULL;
2324         const struct obd_async_page_ops *ops = NULL;
2325         void *caller_data = NULL;
2326         struct osc_async_page *oap;
2327         struct osc_async_page *tmp;
2328         struct ost_body *body;
2329         struct cl_req *clerq = NULL;
2330         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2331         struct ldlm_lock *lock = NULL;
2332         struct cl_req_attr crattr;
2333         int i, rc, mpflag = 0;
2334
2335         ENTRY;
2336         LASSERT(!cfs_list_empty(rpc_list));
2337
2338         if (cmd & OBD_BRW_MEMALLOC)
2339                 mpflag = cfs_memory_pressure_get_and_set();
2340
2341         memset(&crattr, 0, sizeof crattr);
2342         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2343         if (pga == NULL)
2344                 GOTO(out, req = ERR_PTR(-ENOMEM));
2345
2346         OBDO_ALLOC(oa);
2347         if (oa == NULL)
2348                 GOTO(out, req = ERR_PTR(-ENOMEM));
2349
2350         i = 0;
2351         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2352                 struct cl_page *page = osc_oap2cl_page(oap);
2353                 if (ops == NULL) {
2354                         ops = oap->oap_caller_ops;
2355                         caller_data = oap->oap_caller_data;
2356
2357                         clerq = cl_req_alloc(env, page, crt,
2358                                              1 /* only 1-object rpcs for
2359                                                 * now */);
2360                         if (IS_ERR(clerq))
2361                                 GOTO(out, req = (void *)clerq);
2362                         lock = oap->oap_ldlm_lock;
2363                 }
2364                 pga[i] = &oap->oap_brw_page;
2365                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2366                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2367                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2368                 i++;
2369                 cl_req_page_add(env, clerq, page);
2370         }
2371
2372         /* always get the data for the obdo for the rpc */
2373         LASSERT(ops != NULL);
2374         crattr.cra_oa = oa;
2375         crattr.cra_capa = NULL;
2376         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2377         if (lock) {
2378                 oa->o_handle = lock->l_remote_handle;
2379                 oa->o_valid |= OBD_MD_FLHANDLE;
2380         }
2381
2382         rc = cl_req_prep(env, clerq);
2383         if (rc != 0) {
2384                 CERROR("cl_req_prep failed: %d\n", rc);
2385                 GOTO(out, req = ERR_PTR(rc));
2386         }
2387
2388         sort_brw_pages(pga, page_count);
2389         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2390                                   pga, &req, crattr.cra_capa, 1);
2391         if (rc != 0) {
2392                 CERROR("prep_req failed: %d\n", rc);
2393                 GOTO(out, req = ERR_PTR(rc));
2394         }
2395
2396         if (cmd & OBD_BRW_MEMALLOC)
2397                 req->rq_memalloc = 1;
2398
2399         /* Need to update the timestamps after the request is built in case
2400          * we race with setattr (locally or in queue at OST).  If OST gets
2401          * later setattr before earlier BRW (as determined by the request xid),
2402          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2403          * way to do this in a single call.  bug 10150 */
2404         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2405         cl_req_attr_set(env, clerq, &crattr,
2406                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2407
2408         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2409         aa = ptlrpc_req_async_args(req);
2410         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2411         cfs_list_splice(rpc_list, &aa->aa_oaps);
2412         CFS_INIT_LIST_HEAD(rpc_list);
2413         aa->aa_clerq = clerq;
2414 out:
2415         if (cmd & OBD_BRW_MEMALLOC)
2416                 cfs_memory_pressure_restore(mpflag);
2417
2418         capa_put(crattr.cra_capa);
2419         if (IS_ERR(req)) {
2420                 if (oa)
2421                         OBDO_FREE(oa);
2422                 if (pga)
2423                         OBD_FREE(pga, sizeof(*pga) * page_count);
2424                 /* this should happen rarely and is pretty bad, it makes the
2425                  * pending list not follow the dirty order */
2426                 client_obd_list_lock(&cli->cl_loi_list_lock);
2427                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2428                         cfs_list_del_init(&oap->oap_rpc_item);
2429
2430                         /* queued sync pages can be torn down while the pages
2431                          * were between the pending list and the rpc */
2432                         if (oap->oap_interrupted) {
2433                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2434                                 osc_ap_completion(env, cli, NULL, oap, 0,
2435                                                   oap->oap_count);
2436                                 continue;
2437                         }
2438                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2439                 }
2440                 if (clerq && !IS_ERR(clerq))
2441                         cl_req_completion(env, clerq, PTR_ERR(req));
2442         }
2443         RETURN(req);
2444 }
2445
2446 /**
2447  * prepare pages for ASYNC io and put pages in send queue.
2448  *
2449  * \param cmd OBD_BRW_* macroses
2450  * \param lop pending pages
2451  *
2452  * \return zero if no page added to send queue.
2453  * \return 1 if pages successfully added to send queue.
2454  * \return negative on errors.
2455  */
2456 static int
2457 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2458                  struct lov_oinfo *loi,
2459                  int cmd, struct loi_oap_pages *lop)
2460 {
2461         struct ptlrpc_request *req;
2462         obd_count page_count = 0;
2463         struct osc_async_page *oap = NULL, *tmp;
2464         struct osc_brw_async_args *aa;
2465         const struct obd_async_page_ops *ops;
2466         CFS_LIST_HEAD(rpc_list);
2467         CFS_LIST_HEAD(tmp_list);
2468         unsigned int ending_offset;
2469         unsigned  starting_offset = 0;
2470         int srvlock = 0, mem_tight = 0;
2471         struct cl_object *clob = NULL;
2472         ENTRY;
2473
2474         /* ASYNC_HP pages first. At present, when the lock the pages is
2475          * to be canceled, the pages covered by the lock will be sent out
2476          * with ASYNC_HP. We have to send out them as soon as possible. */
2477         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2478                 if (oap->oap_async_flags & ASYNC_HP) 
2479                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2480                 else
2481                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2482                 if (++page_count >= cli->cl_max_pages_per_rpc)
2483                         break;
2484         }
2485
2486         cfs_list_splice(&tmp_list, &lop->lop_pending);
2487         page_count = 0;
2488
2489         /* first we find the pages we're allowed to work with */
2490         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2491                                      oap_pending_item) {
2492                 ops = oap->oap_caller_ops;
2493
2494                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2495                          "magic 0x%x\n", oap, oap->oap_magic);
2496
2497                 if (clob == NULL) {
2498                         /* pin object in memory, so that completion call-backs
2499                          * can be safely called under client_obd_list lock. */
2500                         clob = osc_oap2cl_page(oap)->cp_obj;
2501                         cl_object_get(clob);
2502                 }
2503
2504                 if (page_count != 0 &&
2505                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2506                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2507                                " oap %p, page %p, srvlock %u\n",
2508                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2509                         break;
2510                 }
2511
2512                 /* If there is a gap at the start of this page, it can't merge
2513                  * with any previous page, so we'll hand the network a
2514                  * "fragmented" page array that it can't transfer in 1 RDMA */
2515                 if (page_count != 0 && oap->oap_page_off != 0)
2516                         break;
2517
2518                 /* in llite being 'ready' equates to the page being locked
2519                  * until completion unlocks it.  commit_write submits a page
2520                  * as not ready because its unlock will happen unconditionally
2521                  * as the call returns.  if we race with commit_write giving
2522                  * us that page we don't want to create a hole in the page
2523                  * stream, so we stop and leave the rpc to be fired by
2524                  * another dirtier or kupdated interval (the not ready page
2525                  * will still be on the dirty list).  we could call in
2526                  * at the end of ll_file_write to process the queue again. */
2527                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2528                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2529                                                     cmd);
2530                         if (rc < 0)
2531                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2532                                                 "instead of ready\n", oap,
2533                                                 oap->oap_page, rc);
2534                         switch (rc) {
2535                         case -EAGAIN:
2536                                 /* llite is telling us that the page is still
2537                                  * in commit_write and that we should try
2538                                  * and put it in an rpc again later.  we
2539                                  * break out of the loop so we don't create
2540                                  * a hole in the sequence of pages in the rpc
2541                                  * stream.*/
2542                                 oap = NULL;
2543                                 break;
2544                         case -EINTR:
2545                                 /* the io isn't needed.. tell the checks
2546                                  * below to complete the rpc with EINTR */
2547                                 cfs_spin_lock(&oap->oap_lock);
2548                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2549                                 cfs_spin_unlock(&oap->oap_lock);
2550                                 oap->oap_count = -EINTR;
2551                                 break;
2552                         case 0:
2553                                 cfs_spin_lock(&oap->oap_lock);
2554                                 oap->oap_async_flags |= ASYNC_READY;
2555                                 cfs_spin_unlock(&oap->oap_lock);
2556                                 break;
2557                         default:
2558                                 LASSERTF(0, "oap %p page %p returned %d "
2559                                             "from make_ready\n", oap,
2560                                             oap->oap_page, rc);
2561                                 break;
2562                         }
2563                 }
2564                 if (oap == NULL)
2565                         break;
2566                 /*
2567                  * Page submitted for IO has to be locked. Either by
2568                  * ->ap_make_ready() or by higher layers.
2569                  */
2570 #if defined(__KERNEL__) && defined(__linux__)
2571                 {
2572                         struct cl_page *page;
2573
2574                         page = osc_oap2cl_page(oap);
2575
2576                         if (page->cp_type == CPT_CACHEABLE &&
2577                             !(PageLocked(oap->oap_page) &&
2578                               (CheckWriteback(oap->oap_page, cmd)))) {
2579                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2580                                        oap->oap_page,
2581                                        (long)oap->oap_page->flags,
2582                                        oap->oap_async_flags);
2583                                 LBUG();
2584                         }
2585                 }
2586 #endif
2587
2588                 /* take the page out of our book-keeping */
2589                 cfs_list_del_init(&oap->oap_pending_item);
2590                 lop_update_pending(cli, lop, cmd, -1);
2591                 cfs_list_del_init(&oap->oap_urgent_item);
2592
2593                 if (page_count == 0)
2594                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2595                                           (PTLRPC_MAX_BRW_SIZE - 1);
2596
2597                 /* ask the caller for the size of the io as the rpc leaves. */
2598                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2599                         oap->oap_count =
2600                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2601                                                       cmd);
2602                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2603                 }
2604                 if (oap->oap_count <= 0) {
2605                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2606                                oap->oap_count);
2607                         osc_ap_completion(env, cli, NULL,
2608                                           oap, 0, oap->oap_count);
2609                         continue;
2610                 }
2611
2612                 /* now put the page back in our accounting */
2613                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2614                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2615                         mem_tight = 1;
2616                 if (page_count == 0)
2617                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2618                 if (++page_count >= cli->cl_max_pages_per_rpc)
2619                         break;
2620
2621                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2622                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2623                  * have the same alignment as the initial writes that allocated
2624                  * extents on the server. */
2625                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2626                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2627                 if (ending_offset == 0)
2628                         break;
2629
2630                 /* If there is a gap at the end of this page, it can't merge
2631                  * with any subsequent pages, so we'll hand the network a
2632                  * "fragmented" page array that it can't transfer in 1 RDMA */
2633                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2634                         break;
2635         }
2636
2637         osc_wake_cache_waiters(cli);
2638         osc_wake_sync_fs(cli);
2639         loi_list_maint(cli, loi);
2640
2641         client_obd_list_unlock(&cli->cl_loi_list_lock);
2642
2643         if (clob != NULL)
2644                 cl_object_put(env, clob);
2645
2646         if (page_count == 0) {
2647                 client_obd_list_lock(&cli->cl_loi_list_lock);
2648                 RETURN(0);
2649         }
2650
2651         req = osc_build_req(env, cli, &rpc_list, page_count,
2652                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2653         if (IS_ERR(req)) {
2654                 LASSERT(cfs_list_empty(&rpc_list));
2655                 loi_list_maint(cli, loi);
2656                 RETURN(PTR_ERR(req));
2657         }
2658
2659         aa = ptlrpc_req_async_args(req);
2660
2661         if (cmd == OBD_BRW_READ) {
2662                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2663                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2664                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2665                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2666         } else {
2667                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2668                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2669                                  cli->cl_w_in_flight);
2670                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2671                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2672         }
2673         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2674
2675         client_obd_list_lock(&cli->cl_loi_list_lock);
2676
2677         if (cmd == OBD_BRW_READ)
2678                 cli->cl_r_in_flight++;
2679         else
2680                 cli->cl_w_in_flight++;
2681
2682         /* queued sync pages can be torn down while the pages
2683          * were between the pending list and the rpc */
2684         tmp = NULL;
2685         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2686                 /* only one oap gets a request reference */
2687                 if (tmp == NULL)
2688                         tmp = oap;
2689                 if (oap->oap_interrupted && !req->rq_intr) {
2690                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2691                                oap, req);
2692                         ptlrpc_mark_interrupted(req);
2693                 }
2694         }
2695         if (tmp != NULL)
2696                 tmp->oap_request = ptlrpc_request_addref(req);
2697
2698         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2699                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2700
2701         req->rq_interpret_reply = brw_interpret;
2702         ptlrpcd_add_req(req, PSCOPE_BRW);
2703         RETURN(1);
2704 }
2705
2706 #define LOI_DEBUG(LOI, STR, args...)                                     \
2707         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2708                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2709                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2710                (LOI)->loi_write_lop.lop_num_pending,                     \
2711                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2712                (LOI)->loi_read_lop.lop_num_pending,                      \
2713                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2714                args)                                                     \
2715
2716 /* This is called by osc_check_rpcs() to find which objects have pages that
2717  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2718 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2719 {
2720         ENTRY;
2721
2722         /* First return objects that have blocked locks so that they
2723          * will be flushed quickly and other clients can get the lock,
2724          * then objects which have pages ready to be stuffed into RPCs */
2725         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2726                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2727                                       struct lov_oinfo, loi_hp_ready_item));
2728         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2729                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2730                                       struct lov_oinfo, loi_ready_item));
2731         if (!cfs_list_empty(&cli->cl_loi_sync_fs_list))
2732                 RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next,
2733                                       struct lov_oinfo, loi_sync_fs_item));
2734
2735         /* then if we have cache waiters, return all objects with queued
2736          * writes.  This is especially important when many small files
2737          * have filled up the cache and not been fired into rpcs because
2738          * they don't pass the nr_pending/object threshhold */
2739         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2740             !cfs_list_empty(&cli->cl_loi_write_list))
2741                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2742                                       struct lov_oinfo, loi_write_item));
2743
2744         /* then return all queued objects when we have an invalid import
2745          * so that they get flushed */
2746         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2747                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2748                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2749                                               struct lov_oinfo,
2750                                               loi_write_item));
2751                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2752                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2753                                               struct lov_oinfo, loi_read_item));
2754         }
2755         RETURN(NULL);
2756 }
2757
2758 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2759 {
2760         struct osc_async_page *oap;
2761         int hprpc = 0;
2762
2763         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2764                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2765                                      struct osc_async_page, oap_urgent_item);
2766                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2767         }
2768
2769         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2770                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2771                                      struct osc_async_page, oap_urgent_item);
2772                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2773         }
2774
2775         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2776 }
2777
2778 /* called with the loi list lock held */
2779 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2780 {
2781         struct lov_oinfo *loi;
2782         int rc = 0, race_counter = 0;
2783         ENTRY;
2784
2785         while ((loi = osc_next_loi(cli)) != NULL) {
2786                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2787
2788                 if (osc_max_rpc_in_flight(cli, loi))
2789                         break;
2790
2791                 /* attempt some read/write balancing by alternating between
2792                  * reads and writes in an object.  The makes_rpc checks here
2793                  * would be redundant if we were getting read/write work items
2794                  * instead of objects.  we don't want send_oap_rpc to drain a
2795                  * partial read pending queue when we're given this object to
2796                  * do io on writes while there are cache waiters */
2797                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2798                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2799                                               &loi->loi_write_lop);
2800                         if (rc < 0) {
2801                                 CERROR("Write request failed with %d\n", rc);
2802
2803                                 /* osc_send_oap_rpc failed, mostly because of
2804                                  * memory pressure.
2805                                  *
2806                                  * It can't break here, because if:
2807                                  *  - a page was submitted by osc_io_submit, so
2808                                  *    page locked;
2809                                  *  - no request in flight
2810                                  *  - no subsequent request
2811                                  * The system will be in live-lock state,
2812                                  * because there is no chance to call
2813                                  * osc_io_unplug() and osc_check_rpcs() any
2814                                  * more. pdflush can't help in this case,
2815                                  * because it might be blocked at grabbing
2816                                  * the page lock as we mentioned.
2817                                  *
2818                                  * Anyway, continue to drain pages. */
2819                                 /* break; */
2820                         }
2821
2822                         if (rc > 0)
2823                                 race_counter = 0;
2824                         else
2825                                 race_counter++;
2826                 }
2827                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2828                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2829                                               &loi->loi_read_lop);
2830                         if (rc < 0)
2831                                 CERROR("Read request failed with %d\n", rc);
2832
2833                         if (rc > 0)
2834                                 race_counter = 0;
2835                         else
2836                                 race_counter++;
2837                 }
2838
2839                 /* attempt some inter-object balancing by issuing rpcs
2840                  * for each object in turn */
2841                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2842                         cfs_list_del_init(&loi->loi_hp_ready_item);
2843                 if (!cfs_list_empty(&loi->loi_ready_item))
2844                         cfs_list_del_init(&loi->loi_ready_item);
2845                 if (!cfs_list_empty(&loi->loi_write_item))
2846                         cfs_list_del_init(&loi->loi_write_item);
2847                 if (!cfs_list_empty(&loi->loi_read_item))
2848                         cfs_list_del_init(&loi->loi_read_item);
2849                 if (!cfs_list_empty(&loi->loi_sync_fs_item))
2850                         cfs_list_del_init(&loi->loi_sync_fs_item);
2851
2852                 loi_list_maint(cli, loi);
2853
2854                 /* send_oap_rpc fails with 0 when make_ready tells it to
2855                  * back off.  llite's make_ready does this when it tries
2856                  * to lock a page queued for write that is already locked.
2857                  * we want to try sending rpcs from many objects, but we
2858                  * don't want to spin failing with 0.  */
2859                 if (race_counter == 10)
2860                         break;
2861         }
2862         EXIT;
2863 }
2864
2865 /* we're trying to queue a page in the osc so we're subject to the
2866  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2867  * If the osc's queued pages are already at that limit, then we want to sleep
2868  * until there is space in the osc's queue for us.  We also may be waiting for
2869  * write credits from the OST if there are RPCs in flight that may return some
2870  * before we fall back to sync writes.
2871  *
2872  * We need this know our allocation was granted in the presence of signals */
2873 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2874 {
2875         int rc;
2876         ENTRY;
2877         client_obd_list_lock(&cli->cl_loi_list_lock);
2878         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2879         client_obd_list_unlock(&cli->cl_loi_list_lock);
2880         RETURN(rc);
2881 };
2882
2883 /**
2884  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2885  * is available.
2886  */
2887 int osc_enter_cache_try(const struct lu_env *env,
2888                         struct client_obd *cli, struct lov_oinfo *loi,
2889                         struct osc_async_page *oap, int transient)
2890 {
2891         int has_grant;
2892
2893         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2894         if (has_grant) {
2895                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2896                 if (transient) {
2897                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2898                         cfs_atomic_inc(&obd_dirty_transit_pages);
2899                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2900                 }
2901         }
2902         return has_grant;
2903 }
2904
2905 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2906  * grant or cache space. */
2907 static int osc_enter_cache(const struct lu_env *env,
2908                            struct client_obd *cli, struct lov_oinfo *loi,
2909                            struct osc_async_page *oap)
2910 {
2911         struct osc_cache_waiter ocw;
2912         struct l_wait_info lwi = { 0 };
2913
2914         ENTRY;
2915
2916         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2917                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2918                cli->cl_dirty_max, obd_max_dirty_pages,
2919                cli->cl_lost_grant, cli->cl_avail_grant);
2920
2921         /* force the caller to try sync io.  this can jump the list
2922          * of queued writes and create a discontiguous rpc stream */
2923         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2924             loi->loi_ar.ar_force_sync)
2925                 RETURN(-EDQUOT);
2926
2927         /* Hopefully normal case - cache space and write credits available */
2928         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2929             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2930             osc_enter_cache_try(env, cli, loi, oap, 0))
2931                 RETURN(0);
2932
2933         /* It is safe to block as a cache waiter as long as there is grant
2934          * space available or the hope of additional grant being returned
2935          * when an in flight write completes.  Using the write back cache
2936          * if possible is preferable to sending the data synchronously
2937          * because write pages can then be merged in to large requests.
2938          * The addition of this cache waiter will causing pending write
2939          * pages to be sent immediately. */
2940         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2941                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2942                 cfs_waitq_init(&ocw.ocw_waitq);
2943                 ocw.ocw_oap = oap;
2944                 ocw.ocw_rc = 0;
2945
2946                 loi_list_maint(cli, loi);
2947                 osc_check_rpcs(env, cli);
2948                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2949
2950                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2951                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2952
2953                 client_obd_list_lock(&cli->cl_loi_list_lock);
2954                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2955                         cfs_list_del(&ocw.ocw_entry);
2956                         RETURN(-EINTR);
2957                 }
2958                 RETURN(ocw.ocw_rc);
2959         }
2960
2961         RETURN(-EDQUOT);
2962 }
2963
2964
2965 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2966                         struct lov_oinfo *loi, cfs_page_t *page,
2967                         obd_off offset, const struct obd_async_page_ops *ops,
2968                         void *data, void **res, int nocache,
2969                         struct lustre_handle *lockh)
2970 {
2971         struct osc_async_page *oap;
2972
2973         ENTRY;
2974
2975         if (!page)
2976                 return cfs_size_round(sizeof(*oap));
2977
2978         oap = *res;
2979         oap->oap_magic = OAP_MAGIC;
2980         oap->oap_cli = &exp->exp_obd->u.cli;
2981         oap->oap_loi = loi;
2982
2983         oap->oap_caller_ops = ops;
2984         oap->oap_caller_data = data;
2985
2986         oap->oap_page = page;
2987         oap->oap_obj_off = offset;
2988         if (!client_is_remote(exp) &&
2989             cfs_capable(CFS_CAP_SYS_RESOURCE))
2990                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2991
2992         LASSERT(!(offset & ~CFS_PAGE_MASK));
2993
2994         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2995         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2996         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2997         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2998
2999         cfs_spin_lock_init(&oap->oap_lock);
3000         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
3001         RETURN(0);
3002 }
3003
3004 struct osc_async_page *oap_from_cookie(void *cookie)
3005 {
3006         struct osc_async_page *oap = cookie;
3007         if (oap->oap_magic != OAP_MAGIC)
3008                 return ERR_PTR(-EINVAL);
3009         return oap;
3010 };
3011
3012 int osc_queue_async_io(const struct lu_env *env,
3013                        struct obd_export *exp, struct lov_stripe_md *lsm,
3014                        struct lov_oinfo *loi, void *cookie,
3015                        int cmd, obd_off off, int count,
3016                        obd_flag brw_flags, enum async_flags async_flags)
3017 {
3018         struct client_obd *cli = &exp->exp_obd->u.cli;
3019         struct osc_async_page *oap;
3020         int rc = 0;
3021         ENTRY;
3022
3023         oap = oap_from_cookie(cookie);
3024         if (IS_ERR(oap))
3025                 RETURN(PTR_ERR(oap));
3026
3027         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3028                 RETURN(-EIO);
3029
3030         if (!cfs_list_empty(&oap->oap_pending_item) ||
3031             !cfs_list_empty(&oap->oap_urgent_item) ||
3032             !cfs_list_empty(&oap->oap_rpc_item))
3033                 RETURN(-EBUSY);
3034
3035         /* check if the file's owner/group is over quota */
3036         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3037                 struct cl_object *obj;
3038                 struct cl_attr    attr; /* XXX put attr into thread info */
3039                 unsigned int qid[MAXQUOTAS];
3040
3041                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3042
3043                 cl_object_attr_lock(obj);
3044                 rc = cl_object_attr_get(env, obj, &attr);
3045                 cl_object_attr_unlock(obj);
3046
3047                 qid[USRQUOTA] = attr.cat_uid;
3048                 qid[GRPQUOTA] = attr.cat_gid;
3049                 if (rc == 0 &&
3050                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3051                         rc = -EDQUOT;
3052                 if (rc)
3053                         RETURN(rc);
3054         }
3055
3056         if (loi == NULL)
3057                 loi = lsm->lsm_oinfo[0];
3058
3059         client_obd_list_lock(&cli->cl_loi_list_lock);
3060
3061         LASSERT(off + count <= CFS_PAGE_SIZE);
3062         oap->oap_cmd = cmd;
3063         oap->oap_page_off = off;
3064         oap->oap_count = count;
3065         oap->oap_brw_flags = brw_flags;
3066         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3067         if (cfs_memory_pressure_get())
3068                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3069         cfs_spin_lock(&oap->oap_lock);
3070         oap->oap_async_flags = async_flags;
3071         cfs_spin_unlock(&oap->oap_lock);
3072
3073         if (cmd & OBD_BRW_WRITE) {
3074                 rc = osc_enter_cache(env, cli, loi, oap);
3075                 if (rc) {
3076                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3077                         RETURN(rc);
3078                 }
3079         }
3080
3081         osc_oap_to_pending(oap);
3082         loi_list_maint(cli, loi);
3083
3084         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3085                   cmd);
3086
3087         osc_check_rpcs(env, cli);
3088         client_obd_list_unlock(&cli->cl_loi_list_lock);
3089
3090         RETURN(0);
3091 }
3092
3093 /* aka (~was & now & flag), but this is more clear :) */
3094 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3095
3096 int osc_set_async_flags_base(struct client_obd *cli,
3097                              struct lov_oinfo *loi, struct osc_async_page *oap,
3098                              obd_flag async_flags)
3099 {
3100         struct loi_oap_pages *lop;
3101         int flags = 0;
3102         ENTRY;
3103
3104         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3105
3106         if (oap->oap_cmd & OBD_BRW_WRITE) {
3107                 lop = &loi->loi_write_lop;
3108         } else {
3109                 lop = &loi->loi_read_lop;
3110         }
3111
3112         if ((oap->oap_async_flags & async_flags) == async_flags)
3113                 RETURN(0);
3114
3115         /* XXX: This introduces a tiny insignificant race for the case if this
3116          * loi already had other urgent items.
3117          */
3118         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) &&
3119             cfs_list_empty(&oap->oap_rpc_item) &&
3120             cfs_list_empty(&oap->oap_urgent_item)) {
3121                 osc_add_to_lop_urgent(lop, oap, ASYNC_SYNCFS);
3122                 flags |= ASYNC_SYNCFS;
3123                 cfs_spin_lock(&oap->oap_lock);
3124                 oap->oap_async_flags |= flags;
3125                 cfs_spin_unlock(&oap->oap_lock);
3126                 loi_list_maint(cli, loi);
3127                 RETURN(0);
3128         }
3129
3130         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3131                 flags |= ASYNC_READY;
3132
3133         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3134             cfs_list_empty(&oap->oap_rpc_item)) {
3135                 osc_add_to_lop_urgent(lop, oap, ASYNC_URGENT);
3136                 flags |= ASYNC_URGENT;
3137                 loi_list_maint(cli, loi);
3138         }
3139         cfs_spin_lock(&oap->oap_lock);
3140         oap->oap_async_flags |= flags;
3141         cfs_spin_unlock(&oap->oap_lock);
3142
3143         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3144                         oap->oap_async_flags);
3145         RETURN(0);
3146 }
3147
3148 int osc_teardown_async_page(struct obd_export *exp,
3149                             struct lov_stripe_md *lsm,
3150                             struct lov_oinfo *loi, void *cookie)
3151 {
3152         struct client_obd *cli = &exp->exp_obd->u.cli;
3153         struct loi_oap_pages *lop;
3154         struct osc_async_page *oap;
3155         int rc = 0;
3156         ENTRY;
3157
3158         oap = oap_from_cookie(cookie);
3159         if (IS_ERR(oap))
3160                 RETURN(PTR_ERR(oap));
3161
3162         if (loi == NULL)
3163                 loi = lsm->lsm_oinfo[0];
3164
3165         if (oap->oap_cmd & OBD_BRW_WRITE) {
3166                 lop = &loi->loi_write_lop;
3167         } else {
3168                 lop = &loi->loi_read_lop;
3169         }
3170
3171         client_obd_list_lock(&cli->cl_loi_list_lock);
3172
3173         if (!cfs_list_empty(&oap->oap_rpc_item))
3174                 GOTO(out, rc = -EBUSY);
3175
3176         osc_exit_cache(cli, oap, 0);
3177         osc_wake_cache_waiters(cli);
3178
3179         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3180                 cfs_list_del_init(&oap->oap_urgent_item);
3181                 cfs_spin_lock(&oap->oap_lock);
3182                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP |
3183                                           ASYNC_SYNCFS);
3184                 cfs_spin_unlock(&oap->oap_lock);
3185         }
3186         if (!cfs_list_empty(&oap->oap_pending_item)) {
3187                 cfs_list_del_init(&oap->oap_pending_item);
3188                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3189         }
3190         loi_list_maint(cli, loi);
3191         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3192 out:
3193         client_obd_list_unlock(&cli->cl_loi_list_lock);
3194         RETURN(rc);
3195 }
3196
3197 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3198                                          struct ldlm_enqueue_info *einfo,
3199                                          int flags)
3200 {
3201         void *data = einfo->ei_cbdata;
3202
3203         LASSERT(lock != NULL);
3204         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3205         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3206         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3207         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3208
3209         lock_res_and_lock(lock);
3210         cfs_spin_lock(&osc_ast_guard);
3211         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3212         lock->l_ast_data = data;
3213         cfs_spin_unlock(&osc_ast_guard);
3214         unlock_res_and_lock(lock);
3215 }
3216
3217 static void osc_set_data_with_check(struct lustre_handle *lockh,
3218                                     struct ldlm_enqueue_info *einfo,
3219                                     int flags)
3220 {
3221         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3222
3223         if (lock != NULL) {
3224                 osc_set_lock_data_with_check(lock, einfo, flags);
3225                 LDLM_LOCK_PUT(lock);
3226         } else
3227                 CERROR("lockh %p, data %p - client evicted?\n",
3228                        lockh, einfo->ei_cbdata);
3229 }
3230
3231 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3232                              ldlm_iterator_t replace, void *data)
3233 {
3234         struct ldlm_res_id res_id;
3235         struct obd_device *obd = class_exp2obd(exp);
3236
3237         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3238         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3239         return 0;
3240 }
3241
3242 /* find any ldlm lock of the inode in osc
3243  * return 0    not find
3244  *        1    find one
3245  *      < 0    error */
3246 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3247                            ldlm_iterator_t replace, void *data)
3248 {
3249         struct ldlm_res_id res_id;
3250         struct obd_device *obd = class_exp2obd(exp);
3251         int rc = 0;
3252
3253         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3254         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3255         if (rc == LDLM_ITER_STOP)
3256                 return(1);
3257         if (rc == LDLM_ITER_CONTINUE)
3258                 return(0);
3259         return(rc);
3260 }
3261
3262 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3263                             obd_enqueue_update_f upcall, void *cookie,
3264                             int *flags, int rc)
3265 {
3266         int intent = *flags & LDLM_FL_HAS_INTENT;
3267         ENTRY;
3268
3269         if (intent) {
3270                 /* The request was created before ldlm_cli_enqueue call. */
3271                 if (rc == ELDLM_LOCK_ABORTED) {
3272                         struct ldlm_reply *rep;
3273                         rep = req_capsule_server_get(&req->rq_pill,
3274                                                      &RMF_DLM_REP);
3275
3276                         LASSERT(rep != NULL);
3277                         if (rep->lock_policy_res1)
3278                                 rc = rep->lock_policy_res1;
3279                 }
3280         }
3281
3282         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3283                 *flags |= LDLM_FL_LVB_READY;
3284                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3285                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3286         }
3287
3288         /* Call the update callback. */
3289         rc = (*upcall)(cookie, rc);
3290         RETURN(rc);
3291 }
3292
3293 static int osc_enqueue_interpret(const struct lu_env *env,
3294                                  struct ptlrpc_request *req,
3295                                  struct osc_enqueue_args *aa, int rc)
3296 {
3297         struct ldlm_lock *lock;
3298         struct lustre_handle handle;
3299         __u32 mode;
3300
3301         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3302          * might be freed anytime after lock upcall has been called. */
3303         lustre_handle_copy(&handle, aa->oa_lockh);
3304         mode = aa->oa_ei->ei_mode;
3305
3306         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3307          * be valid. */
3308         lock = ldlm_handle2lock(&handle);
3309
3310         /* Take an additional reference so that a blocking AST that
3311          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3312          * to arrive after an upcall has been executed by
3313          * osc_enqueue_fini(). */
3314         ldlm_lock_addref(&handle, mode);
3315
3316         /* Let CP AST to grant the lock first. */
3317         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3318
3319         /* Complete obtaining the lock procedure. */
3320         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3321                                    mode, aa->oa_flags, aa->oa_lvb,
3322                                    sizeof(*aa->oa_lvb), &handle, rc);
3323         /* Complete osc stuff. */
3324         rc = osc_enqueue_fini(req, aa->oa_lvb,
3325                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3326
3327         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3328
3329         /* Release the lock for async request. */
3330         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3331                 /*
3332                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3333                  * not already released by
3334                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3335                  */
3336                 ldlm_lock_decref(&handle, mode);
3337
3338         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3339                  aa->oa_lockh, req, aa);
3340         ldlm_lock_decref(&handle, mode);
3341         LDLM_LOCK_PUT(lock);
3342         return rc;
3343 }
3344
3345 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3346                         struct lov_oinfo *loi, int flags,
3347                         struct ost_lvb *lvb, __u32 mode, int rc)
3348 {
3349         if (rc == ELDLM_OK) {
3350                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3351                 __u64 tmp;
3352
3353                 LASSERT(lock != NULL);
3354                 loi->loi_lvb = *lvb;
3355                 tmp = loi->loi_lvb.lvb_size;
3356                 /* Extend KMS up to the end of this lock and no further
3357                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3358                 if (tmp > lock->l_policy_data.l_extent.end)
3359                         tmp = lock->l_policy_data.l_extent.end + 1;
3360                 if (tmp >= loi->loi_kms) {
3361                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3362                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3363                         loi_kms_set(loi, tmp);
3364                 } else {
3365                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3366                                    LPU64"; leaving kms="LPU64", end="LPU64,
3367                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3368                                    lock->l_policy_data.l_extent.end);
3369                 }
3370                 ldlm_lock_allow_match(lock);
3371                 LDLM_LOCK_PUT(lock);
3372         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3373                 loi->loi_lvb = *lvb;
3374                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3375                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3376                 rc = ELDLM_OK;
3377         }
3378 }
3379 EXPORT_SYMBOL(osc_update_enqueue);
3380
3381 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3382
3383 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3384  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3385  * other synchronous requests, however keeping some locks and trying to obtain
3386  * others may take a considerable amount of time in a case of ost failure; and
3387  * when other sync requests do not get released lock from a client, the client
3388  * is excluded from the cluster -- such scenarious make the life difficult, so
3389  * release locks just after they are obtained. */
3390 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3391                      int *flags, ldlm_policy_data_t *policy,
3392                      struct ost_lvb *lvb, int kms_valid,
3393                      obd_enqueue_update_f upcall, void *cookie,
3394                      struct ldlm_enqueue_info *einfo,
3395                      struct lustre_handle *lockh,
3396                      struct ptlrpc_request_set *rqset, int async)
3397 {
3398         struct obd_device *obd = exp->exp_obd;
3399         struct ptlrpc_request *req = NULL;
3400         int intent = *flags & LDLM_FL_HAS_INTENT;
3401         ldlm_mode_t mode;
3402         int rc;
3403         ENTRY;
3404
3405         /* Filesystem lock extents are extended to page boundaries so that
3406          * dealing with the page cache is a little smoother.  */
3407         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3408         policy->l_extent.end |= ~CFS_PAGE_MASK;
3409
3410         /*
3411          * kms is not valid when either object is completely fresh (so that no
3412          * locks are cached), or object was evicted. In the latter case cached
3413          * lock cannot be used, because it would prime inode state with
3414          * potentially stale LVB.
3415          */
3416         if (!kms_valid)
3417                 goto no_match;
3418
3419         /* Next, search for already existing extent locks that will cover us */
3420         /* If we're trying to read, we also search for an existing PW lock.  The
3421          * VFS and page cache already protect us locally, so lots of readers/
3422          * writers can share a single PW lock.
3423          *
3424          * There are problems with conversion deadlocks, so instead of
3425          * converting a read lock to a write lock, we'll just enqueue a new
3426          * one.
3427          *
3428          * At some point we should cancel the read lock instead of making them
3429          * send us a blocking callback, but there are problems with canceling
3430          * locks out from other users right now, too. */
3431         mode = einfo->ei_mode;
3432         if (einfo->ei_mode == LCK_PR)
3433                 mode |= LCK_PW;
3434         mode = ldlm_lock_match(obd->obd_namespace,
3435                                *flags | LDLM_FL_LVB_READY, res_id,
3436                                einfo->ei_type, policy, mode, lockh, 0);
3437         if (mode) {
3438                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3439
3440                 if (matched->l_ast_data == NULL ||
3441                     matched->l_ast_data == einfo->ei_cbdata) {
3442                         /* addref the lock only if not async requests and PW
3443                          * lock is matched whereas we asked for PR. */
3444                         if (!rqset && einfo->ei_mode != mode)
3445                                 ldlm_lock_addref(lockh, LCK_PR);
3446                         osc_set_lock_data_with_check(matched, einfo, *flags);
3447                         if (intent) {
3448                                 /* I would like to be able to ASSERT here that
3449                                  * rss <= kms, but I can't, for reasons which
3450                                  * are explained in lov_enqueue() */
3451                         }
3452
3453                         /* We already have a lock, and it's referenced */
3454                         (*upcall)(cookie, ELDLM_OK);
3455
3456                         /* For async requests, decref the lock. */
3457                         if (einfo->ei_mode != mode)
3458                                 ldlm_lock_decref(lockh, LCK_PW);
3459                         else if (rqset)
3460                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3461                         LDLM_LOCK_PUT(matched);
3462                         RETURN(ELDLM_OK);
3463                 } else
3464                         ldlm_lock_decref(lockh, mode);
3465                 LDLM_LOCK_PUT(matched);
3466         }
3467
3468  no_match:
3469         if (intent) {
3470                 CFS_LIST_HEAD(cancels);
3471                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3472                                            &RQF_LDLM_ENQUEUE_LVB);
3473                 if (req == NULL)
3474                         RETURN(-ENOMEM);
3475
3476                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3477                 if (rc) {
3478                         ptlrpc_request_free(req);
3479                         RETURN(rc);
3480                 }
3481
3482                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3483                                      sizeof *lvb);
3484                 ptlrpc_request_set_replen(req);
3485         }
3486
3487         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3488         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3489
3490         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3491                               sizeof(*lvb), lockh, async);
3492         if (rqset) {
3493                 if (!rc) {
3494                         struct osc_enqueue_args *aa;
3495                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3496                         aa = ptlrpc_req_async_args(req);
3497                         aa->oa_ei = einfo;
3498                         aa->oa_exp = exp;
3499                         aa->oa_flags  = flags;
3500                         aa->oa_upcall = upcall;
3501                         aa->oa_cookie = cookie;
3502                         aa->oa_lvb    = lvb;
3503                         aa->oa_lockh  = lockh;
3504
3505                         req->rq_interpret_reply =
3506                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3507                         if (rqset == PTLRPCD_SET)
3508                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3509                         else
3510                                 ptlrpc_set_add_req(rqset, req);
3511                 } else if (intent) {
3512                         ptlrpc_req_finished(req);
3513                 }
3514                 RETURN(rc);
3515         }
3516
3517         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3518         if (intent)
3519                 ptlrpc_req_finished(req);
3520
3521         RETURN(rc);
3522 }
3523
3524 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3525                        struct ldlm_enqueue_info *einfo,
3526                        struct ptlrpc_request_set *rqset)
3527 {
3528         struct ldlm_res_id res_id;
3529         int rc;
3530         ENTRY;
3531
3532         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3533                            oinfo->oi_md->lsm_object_seq, &res_id);
3534
3535         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3536                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3537                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3538                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3539                               rqset, rqset != NULL);
3540         RETURN(rc);
3541 }
3542
3543 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3544                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3545                    int *flags, void *data, struct lustre_handle *lockh,
3546                    int unref)
3547 {
3548         struct obd_device *obd = exp->exp_obd;
3549         int lflags = *flags;
3550         ldlm_mode_t rc;
3551         ENTRY;
3552
3553         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3554                 RETURN(-EIO);
3555
3556         /* Filesystem lock extents are extended to page boundaries so that
3557          * dealing with the page cache is a little smoother */
3558         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3559         policy->l_extent.end |= ~CFS_PAGE_MASK;
3560
3561         /* Next, search for already existing extent locks that will cover us */
3562         /* If we're trying to read, we also search for an existing PW lock.  The
3563          * VFS and page cache already protect us locally, so lots of readers/
3564          * writers can share a single PW lock. */
3565         rc = mode;
3566         if (mode == LCK_PR)
3567                 rc |= LCK_PW;
3568         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3569                              res_id, type, policy, rc, lockh, unref);
3570         if (rc) {
3571                 if (data != NULL)
3572                         osc_set_data_with_check(lockh, data, lflags);
3573                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3574                         ldlm_lock_addref(lockh, LCK_PR);
3575                         ldlm_lock_decref(lockh, LCK_PW);
3576                 }
3577                 RETURN(rc);
3578         }
3579         RETURN(rc);
3580 }
3581
3582 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3583 {
3584         ENTRY;
3585
3586         if (unlikely(mode == LCK_GROUP))
3587                 ldlm_lock_decref_and_cancel(lockh, mode);
3588         else
3589                 ldlm_lock_decref(lockh, mode);
3590
3591         RETURN(0);
3592 }
3593
3594 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3595                       __u32 mode, struct lustre_handle *lockh)
3596 {
3597         ENTRY;
3598         RETURN(osc_cancel_base(lockh, mode));
3599 }
3600
3601 static int osc_cancel_unused(struct obd_export *exp,
3602                              struct lov_stripe_md *lsm,
3603                              ldlm_cancel_flags_t flags,
3604                              void *opaque)
3605 {
3606         struct obd_device *obd = class_exp2obd(exp);
3607         struct ldlm_res_id res_id, *resp = NULL;
3608
3609         if (lsm != NULL) {
3610                 resp = osc_build_res_name(lsm->lsm_object_id,
3611                                           lsm->lsm_object_seq, &res_id);
3612         }
3613
3614         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3615 }
3616
3617 static int osc_statfs_interpret(const struct lu_env *env,
3618                                 struct ptlrpc_request *req,
3619                                 struct osc_async_args *aa, int rc)
3620 {
3621         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3622         struct obd_statfs *msfs;
3623         __u64 used;
3624         ENTRY;
3625
3626         if (rc == -EBADR)
3627                 /* The request has in fact never been sent
3628                  * due to issues at a higher level (LOV).
3629                  * Exit immediately since the caller is
3630                  * aware of the problem and takes care
3631                  * of the clean up */
3632                  RETURN(rc);
3633
3634         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3635             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3636                 GOTO(out, rc = 0);
3637
3638         if (rc != 0)
3639                 GOTO(out, rc);
3640
3641         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3642         if (msfs == NULL) {
3643                 GOTO(out, rc = -EPROTO);
3644         }
3645
3646         /* Reinitialize the RDONLY and DEGRADED flags at the client
3647          * on each statfs, so they don't stay set permanently. */
3648         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3649
3650         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3651                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3652         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3653                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3654
3655         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3656                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3657         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3658                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3659
3660         /* Add a bit of hysteresis so this flag isn't continually flapping,
3661          * and ensure that new files don't get extremely fragmented due to
3662          * only a small amount of available space in the filesystem.
3663          * We want to set the NOSPC flag when there is less than ~0.1% free
3664          * and clear it when there is at least ~0.2% free space, so:
3665          *                   avail < ~0.1% max          max = avail + used
3666          *            1025 * avail < avail + used       used = blocks - free
3667          *            1024 * avail < used
3668          *            1024 * avail < blocks - free                      
3669          *                   avail < ((blocks - free) >> 10)    
3670          *
3671          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3672          * lose that amount of space so in those cases we report no space left
3673          * if their is less than 1 GB left.                             */
3674         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3675         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3676                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3677                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3678         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3679                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3680                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3681
3682         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3683
3684         *aa->aa_oi->oi_osfs = *msfs;
3685 out:
3686         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3687         RETURN(rc);
3688 }
3689
3690 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3691                             __u64 max_age, struct ptlrpc_request_set *rqset)
3692 {
3693         struct ptlrpc_request *req;
3694         struct osc_async_args *aa;
3695         int                    rc;
3696         ENTRY;
3697
3698         /* We could possibly pass max_age in the request (as an absolute
3699          * timestamp or a "seconds.usec ago") so the target can avoid doing
3700          * extra calls into the filesystem if that isn't necessary (e.g.
3701          * during mount that would help a bit).  Having relative timestamps
3702          * is not so great if request processing is slow, while absolute
3703          * timestamps are not ideal because they need time synchronization. */
3704         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3705         if (req == NULL)
3706                 RETURN(-ENOMEM);
3707
3708         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3709         if (rc) {
3710                 ptlrpc_request_free(req);
3711                 RETURN(rc);
3712         }
3713         ptlrpc_request_set_replen(req);
3714         req->rq_request_portal = OST_CREATE_PORTAL;
3715         ptlrpc_at_set_req_timeout(req);
3716
3717         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3718                 /* procfs requests not want stat in wait for avoid deadlock */
3719                 req->rq_no_resend = 1;
3720                 req->rq_no_delay = 1;
3721         }
3722
3723         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3724         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3725         aa = ptlrpc_req_async_args(req);
3726         aa->aa_oi = oinfo;
3727
3728         ptlrpc_set_add_req(rqset, req);
3729         RETURN(0);
3730 }
3731
3732 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3733                       __u64 max_age, __u32 flags)
3734 {
3735         struct obd_statfs     *msfs;
3736         struct ptlrpc_request *req;
3737         struct obd_import     *imp = NULL;
3738         int rc;
3739         ENTRY;
3740
3741         /*Since the request might also come from lprocfs, so we need
3742          *sync this with client_disconnect_export Bug15684*/
3743         cfs_down_read(&obd->u.cli.cl_sem);
3744         if (obd->u.cli.cl_import)
3745                 imp = class_import_get(obd->u.cli.cl_import);
3746         cfs_up_read(&obd->u.cli.cl_sem);
3747         if (!imp)
3748                 RETURN(-ENODEV);
3749
3750         /* We could possibly pass max_age in the request (as an absolute
3751          * timestamp or a "seconds.usec ago") so the target can avoid doing
3752          * extra calls into the filesystem if that isn't necessary (e.g.
3753          * during mount that would help a bit).  Having relative timestamps
3754          * is not so great if request processing is slow, while absolute
3755          * timestamps are not ideal because they need time synchronization. */
3756         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3757
3758         class_import_put(imp);
3759
3760         if (req == NULL)
3761                 RETURN(-ENOMEM);
3762
3763         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3764         if (rc) {
3765                 ptlrpc_request_free(req);
3766                 RETURN(rc);
3767         }
3768         ptlrpc_request_set_replen(req);
3769         req->rq_request_portal = OST_CREATE_PORTAL;
3770         ptlrpc_at_set_req_timeout(req);
3771
3772         if (flags & OBD_STATFS_NODELAY) {
3773                 /* procfs requests not want stat in wait for avoid deadlock */
3774                 req->rq_no_resend = 1;
3775                 req->rq_no_delay = 1;
3776         }
3777
3778         rc = ptlrpc_queue_wait(req);
3779         if (rc)
3780                 GOTO(out, rc);
3781
3782         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3783         if (msfs == NULL) {
3784                 GOTO(out, rc = -EPROTO);
3785         }
3786
3787         *osfs = *msfs;
3788
3789         EXIT;
3790  out:
3791         ptlrpc_req_finished(req);
3792         return rc;
3793 }
3794
3795 /* Retrieve object striping information.
3796  *
3797  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3798  * the maximum number of OST indices which will fit in the user buffer.
3799  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3800  */
3801 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3802 {
3803         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3804         struct lov_user_md_v3 lum, *lumk;
3805         struct lov_user_ost_data_v1 *lmm_objects;
3806         int rc = 0, lum_size;
3807         ENTRY;
3808
3809         if (!lsm)
3810                 RETURN(-ENODATA);
3811
3812         /* we only need the header part from user space to get lmm_magic and
3813          * lmm_stripe_count, (the header part is common to v1 and v3) */
3814         lum_size = sizeof(struct lov_user_md_v1);
3815         if (cfs_copy_from_user(&lum, lump, lum_size))
3816                 RETURN(-EFAULT);
3817
3818         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3819             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3820                 RETURN(-EINVAL);
3821
3822         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3823         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3824         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3825         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3826
3827         /* we can use lov_mds_md_size() to compute lum_size
3828          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3829         if (lum.lmm_stripe_count > 0) {
3830                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3831                 OBD_ALLOC(lumk, lum_size);
3832                 if (!lumk)
3833                         RETURN(-ENOMEM);
3834
3835                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3836                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3837                 else
3838                         lmm_objects = &(lumk->lmm_objects[0]);
3839                 lmm_objects->l_object_id = lsm->lsm_object_id;
3840         } else {
3841                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3842                 lumk = &lum;
3843         }
3844
3845         lumk->lmm_object_id = lsm->lsm_object_id;
3846         lumk->lmm_object_seq = lsm->lsm_object_seq;
3847         lumk->lmm_stripe_count = 1;
3848
3849         if (cfs_copy_to_user(lump, lumk, lum_size))
3850                 rc = -EFAULT;
3851
3852         if (lumk != &lum)
3853                 OBD_FREE(lumk, lum_size);
3854
3855         RETURN(rc);
3856 }
3857
3858
3859 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3860                          void *karg, void *uarg)
3861 {
3862         struct obd_device *obd = exp->exp_obd;
3863         struct obd_ioctl_data *data = karg;
3864         int err = 0;
3865         ENTRY;
3866
3867         if (!cfs_try_module_get(THIS_MODULE)) {
3868                 CERROR("Can't get module. Is it alive?");
3869                 return -EINVAL;
3870         }
3871         switch (cmd) {
3872         case OBD_IOC_LOV_GET_CONFIG: {
3873                 char *buf;
3874                 struct lov_desc *desc;
3875                 struct obd_uuid uuid;
3876
3877                 buf = NULL;
3878                 len = 0;
3879                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3880                         GOTO(out, err = -EINVAL);
3881
3882                 data = (struct obd_ioctl_data *)buf;
3883
3884                 if (sizeof(*desc) > data->ioc_inllen1) {
3885                         obd_ioctl_freedata(buf, len);
3886                         GOTO(out, err = -EINVAL);
3887                 }
3888
3889                 if (data->ioc_inllen2 < sizeof(uuid)) {
3890                         obd_ioctl_freedata(buf, len);
3891                         GOTO(out, err = -EINVAL);
3892                 }
3893
3894                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3895                 desc->ld_tgt_count = 1;
3896                 desc->ld_active_tgt_count = 1;
3897                 desc->ld_default_stripe_count = 1;
3898                 desc->ld_default_stripe_size = 0;
3899                 desc->ld_default_stripe_offset = 0;
3900                 desc->ld_pattern = 0;
3901                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3902
3903                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3904
3905                 err = cfs_copy_to_user((void *)uarg, buf, len);
3906                 if (err)
3907                         err = -EFAULT;
3908                 obd_ioctl_freedata(buf, len);
3909                 GOTO(out, err);
3910         }
3911         case LL_IOC_LOV_SETSTRIPE:
3912                 err = obd_alloc_memmd(exp, karg);
3913                 if (err > 0)
3914                         err = 0;
3915                 GOTO(out, err);
3916         case LL_IOC_LOV_GETSTRIPE:
3917                 err = osc_getstripe(karg, uarg);
3918                 GOTO(out, err);
3919         case OBD_IOC_CLIENT_RECOVER:
3920                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3921                                             data->ioc_inlbuf1);
3922                 if (err > 0)
3923                         err = 0;
3924                 GOTO(out, err);
3925         case IOC_OSC_SET_ACTIVE:
3926                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3927                                                data->ioc_offset);
3928                 GOTO(out, err);
3929         case OBD_IOC_POLL_QUOTACHECK:
3930                 err = lquota_poll_check(quota_interface, exp,
3931                                         (struct if_quotacheck *)karg);
3932                 GOTO(out, err);
3933         case OBD_IOC_PING_TARGET:
3934                 err = ptlrpc_obd_ping(obd);
3935                 GOTO(out, err);
3936         default:
3937                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3938                        cmd, cfs_curproc_comm());
3939                 GOTO(out, err = -ENOTTY);
3940         }
3941 out:
3942         cfs_module_put(THIS_MODULE);
3943         return err;
3944 }
3945
3946 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3947                         void *key, __u32 *vallen, void *val,
3948                         struct lov_stripe_md *lsm)
3949 {
3950         ENTRY;
3951         if (!vallen || !val)
3952                 RETURN(-EFAULT);
3953
3954         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3955                 __u32 *stripe = val;
3956                 *vallen = sizeof(*stripe);
3957                 *stripe = 0;
3958                 RETURN(0);
3959         } else if (KEY_IS(KEY_LAST_ID)) {
3960                 struct ptlrpc_request *req;
3961                 obd_id                *reply;
3962                 char                  *tmp;
3963                 int                    rc;
3964
3965                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3966                                            &RQF_OST_GET_INFO_LAST_ID);
3967                 if (req == NULL)
3968                         RETURN(-ENOMEM);
3969
3970                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3971                                      RCL_CLIENT, keylen);
3972                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3973                 if (rc) {
3974                         ptlrpc_request_free(req);
3975                         RETURN(rc);
3976                 }
3977
3978                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3979                 memcpy(tmp, key, keylen);
3980
3981                 req->rq_no_delay = req->rq_no_resend = 1;
3982                 ptlrpc_request_set_replen(req);
3983                 rc = ptlrpc_queue_wait(req);
3984                 if (rc)
3985                         GOTO(out, rc);
3986
3987                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3988                 if (reply == NULL)
3989                         GOTO(out, rc = -EPROTO);
3990
3991                 *((obd_id *)val) = *reply;
3992         out:
3993                 ptlrpc_req_finished(req);
3994                 RETURN(rc);
3995         } else if (KEY_IS(KEY_FIEMAP)) {
3996                 struct ptlrpc_request *req;
3997                 struct ll_user_fiemap *reply;
3998                 char *tmp;
3999                 int rc;
4000
4001                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
4002                                            &RQF_OST_GET_INFO_FIEMAP);
4003                 if (req == NULL)
4004                         RETURN(-ENOMEM);
4005
4006                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
4007                                      RCL_CLIENT, keylen);
4008                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4009                                      RCL_CLIENT, *vallen);
4010                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4011                                      RCL_SERVER, *vallen);
4012
4013                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
4014                 if (rc) {
4015                         ptlrpc_request_free(req);
4016                         RETURN(rc);
4017                 }
4018
4019                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4020                 memcpy(tmp, key, keylen);
4021                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4022                 memcpy(tmp, val, *vallen);
4023
4024                 ptlrpc_request_set_replen(req);
4025                 rc = ptlrpc_queue_wait(req);
4026                 if (rc)
4027                         GOTO(out1, rc);
4028
4029                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4030                 if (reply == NULL)
4031                         GOTO(out1, rc = -EPROTO);
4032
4033                 memcpy(val, reply, *vallen);
4034         out1:
4035                 ptlrpc_req_finished(req);
4036
4037                 RETURN(rc);
4038         }
4039
4040         RETURN(-EINVAL);
4041 }
4042
4043 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4044 {
4045         struct llog_ctxt *ctxt;
4046         int rc = 0;
4047         ENTRY;
4048
4049         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4050         if (ctxt) {
4051                 rc = llog_initiator_connect(ctxt);
4052                 llog_ctxt_put(ctxt);
4053         } else {
4054                 /* XXX return an error? skip setting below flags? */
4055         }
4056
4057         cfs_spin_lock(&imp->imp_lock);
4058         imp->imp_server_timeout = 1;
4059         imp->imp_pingable = 1;
4060         cfs_spin_unlock(&imp->imp_lock);
4061         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4062
4063         RETURN(rc);
4064 }
4065
4066 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4067                                           struct ptlrpc_request *req,
4068                                           void *aa, int rc)
4069 {
4070         ENTRY;
4071         if (rc != 0)
4072                 RETURN(rc);
4073
4074         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4075 }
4076
4077 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4078                               void *key, obd_count vallen, void *val,
4079                               struct ptlrpc_request_set *set)
4080 {
4081         struct ptlrpc_request *req;
4082         struct obd_device     *obd = exp->exp_obd;
4083         struct obd_import     *imp = class_exp2cliimp(exp);
4084         char                  *tmp;
4085         int                    rc;
4086         ENTRY;
4087
4088         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4089
4090         if (KEY_IS(KEY_NEXT_ID)) {
4091                 obd_id new_val;
4092                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4093
4094                 if (vallen != sizeof(obd_id))
4095                         RETURN(-ERANGE);
4096                 if (val == NULL)
4097                         RETURN(-EINVAL);
4098
4099                 if (vallen != sizeof(obd_id))
4100                         RETURN(-EINVAL);
4101
4102                 /* avoid race between allocate new object and set next id
4103                  * from ll_sync thread */
4104                 cfs_spin_lock(&oscc->oscc_lock);
4105                 new_val = *((obd_id*)val) + 1;
4106                 if (new_val > oscc->oscc_next_id)
4107                         oscc->oscc_next_id = new_val;
4108                 cfs_spin_unlock(&oscc->oscc_lock);
4109                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4110                        exp->exp_obd->obd_name,
4111                        obd->u.cli.cl_oscc.oscc_next_id);
4112
4113                 RETURN(0);
4114         }
4115
4116         if (KEY_IS(KEY_CHECKSUM)) {
4117                 if (vallen != sizeof(int))
4118                         RETURN(-EINVAL);
4119                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4120                 RETURN(0);
4121         }
4122
4123         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4124                 sptlrpc_conf_client_adapt(obd);
4125                 RETURN(0);
4126         }
4127
4128         if (KEY_IS(KEY_FLUSH_CTX)) {
4129                 sptlrpc_import_flush_my_ctx(imp);
4130                 RETURN(0);
4131         }
4132
4133         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4134                 RETURN(-EINVAL);
4135
4136         /* We pass all other commands directly to OST. Since nobody calls osc
4137            methods directly and everybody is supposed to go through LOV, we
4138            assume lov checked invalid values for us.
4139            The only recognised values so far are evict_by_nid and mds_conn.
4140            Even if something bad goes through, we'd get a -EINVAL from OST
4141            anyway. */
4142
4143         if (KEY_IS(KEY_GRANT_SHRINK))
4144                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4145         else
4146                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4147
4148         if (req == NULL)
4149                 RETURN(-ENOMEM);
4150
4151         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4152                              RCL_CLIENT, keylen);
4153         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4154                              RCL_CLIENT, vallen);
4155         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4156         if (rc) {
4157                 ptlrpc_request_free(req);
4158                 RETURN(rc);
4159         }
4160
4161         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4162         memcpy(tmp, key, keylen);
4163         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4164         memcpy(tmp, val, vallen);
4165
4166         if (KEY_IS(KEY_MDS_CONN)) {
4167                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4168
4169                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4170                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4171                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4172                 req->rq_no_delay = req->rq_no_resend = 1;
4173                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4174         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4175                 struct osc_grant_args *aa;
4176                 struct obdo *oa;
4177
4178                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4179                 aa = ptlrpc_req_async_args(req);
4180                 OBDO_ALLOC(oa);
4181                 if (!oa) {
4182                         ptlrpc_req_finished(req);
4183                         RETURN(-ENOMEM);
4184                 }
4185                 *oa = ((struct ost_body *)val)->oa;
4186                 aa->aa_oa = oa;
4187                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4188         }
4189
4190         ptlrpc_request_set_replen(req);
4191         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4192                 LASSERT(set != NULL);
4193                 ptlrpc_set_add_req(set, req);
4194                 ptlrpc_check_set(NULL, set);
4195         } else
4196                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4197
4198         RETURN(0);
4199 }
4200
4201
4202 static struct llog_operations osc_size_repl_logops = {
4203         lop_cancel: llog_obd_repl_cancel
4204 };
4205
4206 static struct llog_operations osc_mds_ost_orig_logops;
4207
4208 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4209                            struct obd_device *tgt, struct llog_catid *catid)
4210 {
4211         int rc;
4212         ENTRY;
4213
4214         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4215                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4216         if (rc) {
4217                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4218                 GOTO(out, rc);
4219         }
4220
4221         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4222                         NULL, &osc_size_repl_logops);
4223         if (rc) {
4224                 struct llog_ctxt *ctxt =
4225                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4226                 if (ctxt)
4227                         llog_cleanup(ctxt);
4228                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4229         }
4230         GOTO(out, rc);
4231 out:
4232         if (rc) {
4233                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4234                        obd->obd_name, tgt->obd_name, catid, rc);
4235                 CERROR("logid "LPX64":0x%x\n",
4236                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4237         }
4238         return rc;
4239 }
4240
4241 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4242                          struct obd_device *disk_obd, int *index)
4243 {
4244         struct llog_catid catid;
4245         static char name[32] = CATLIST;
4246         int rc;
4247         ENTRY;
4248
4249         LASSERT(olg == &obd->obd_olg);
4250
4251         cfs_mutex_down(&olg->olg_cat_processing);
4252         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4253         if (rc) {
4254                 CERROR("rc: %d\n", rc);
4255                 GOTO(out, rc);
4256         }
4257
4258         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4259                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4260                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4261
4262         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4263         if (rc) {
4264                 CERROR("rc: %d\n", rc);
4265                 GOTO(out, rc);
4266         }
4267
4268         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4269         if (rc) {
4270                 CERROR("rc: %d\n", rc);
4271                 GOTO(out, rc);
4272         }
4273
4274  out:
4275         cfs_mutex_up(&olg->olg_cat_processing);
4276
4277         return rc;
4278 }
4279
4280 static int osc_llog_finish(struct obd_device *obd, int count)
4281 {
4282         struct llog_ctxt *ctxt;
4283         int rc = 0, rc2 = 0;
4284         ENTRY;
4285
4286         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4287         if (ctxt)
4288                 rc = llog_cleanup(ctxt);
4289
4290         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4291         if (ctxt)
4292                 rc2 = llog_cleanup(ctxt);
4293         if (!rc)
4294                 rc = rc2;
4295
4296         RETURN(rc);
4297 }
4298
4299 static int osc_reconnect(const struct lu_env *env,
4300                          struct obd_export *exp, struct obd_device *obd,
4301                          struct obd_uuid *cluuid,
4302                          struct obd_connect_data *data,
4303                          void *localdata)
4304 {
4305         struct client_obd *cli = &obd->u.cli;
4306
4307         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4308                 long lost_grant;
4309
4310                 client_obd_list_lock(&cli->cl_loi_list_lock);
4311                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4312                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4313                 lost_grant = cli->cl_lost_grant;
4314                 cli->cl_lost_grant = 0;
4315                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4316
4317                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4318                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4319                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4320                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4321                        " ocd_grant: %d\n", data->ocd_connect_flags,
4322                        data->ocd_version, data->ocd_grant);
4323         }
4324
4325         RETURN(0);
4326 }
4327
4328 static int osc_disconnect(struct obd_export *exp)
4329 {
4330         struct obd_device *obd = class_exp2obd(exp);
4331         struct llog_ctxt  *ctxt;
4332         int rc;
4333
4334         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4335         if (ctxt) {
4336                 if (obd->u.cli.cl_conn_count == 1) {
4337                         /* Flush any remaining cancel messages out to the
4338                          * target */
4339                         llog_sync(ctxt, exp);
4340                 }
4341                 llog_ctxt_put(ctxt);
4342         } else {
4343                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4344                        obd);
4345         }
4346
4347         rc = client_disconnect_export(exp);
4348         /**
4349          * Initially we put del_shrink_grant before disconnect_export, but it
4350          * causes the following problem if setup (connect) and cleanup
4351          * (disconnect) are tangled together.
4352          *      connect p1                     disconnect p2
4353          *   ptlrpc_connect_import
4354          *     ...............               class_manual_cleanup
4355          *                                     osc_disconnect
4356          *                                     del_shrink_grant
4357          *   ptlrpc_connect_interrupt
4358          *     init_grant_shrink
4359          *   add this client to shrink list
4360          *                                      cleanup_osc
4361          * Bang! pinger trigger the shrink.
4362          * So the osc should be disconnected from the shrink list, after we
4363          * are sure the import has been destroyed. BUG18662
4364          */
4365         if (obd->u.cli.cl_import == NULL)
4366                 osc_del_shrink_grant(&obd->u.cli);
4367         return rc;
4368 }
4369
4370 static int osc_import_event(struct obd_device *obd,
4371                             struct obd_import *imp,
4372                             enum obd_import_event event)
4373 {
4374         struct client_obd *cli;
4375         int rc = 0;
4376
4377         ENTRY;
4378         LASSERT(imp->imp_obd == obd);
4379
4380         switch (event) {
4381         case IMP_EVENT_DISCON: {
4382                 /* Only do this on the MDS OSC's */
4383                 if (imp->imp_server_timeout) {
4384                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4385
4386                         cfs_spin_lock(&oscc->oscc_lock);
4387                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4388                         cfs_spin_unlock(&oscc->oscc_lock);
4389                 }
4390                 cli = &obd->u.cli;
4391                 client_obd_list_lock(&cli->cl_loi_list_lock);
4392                 cli->cl_avail_grant = 0;
4393                 cli->cl_lost_grant = 0;
4394                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4395                 break;
4396         }
4397         case IMP_EVENT_INACTIVE: {
4398                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4399                 break;
4400         }
4401         case IMP_EVENT_INVALIDATE: {
4402                 struct ldlm_namespace *ns = obd->obd_namespace;
4403                 struct lu_env         *env;
4404                 int                    refcheck;
4405
4406                 env = cl_env_get(&refcheck);
4407                 if (!IS_ERR(env)) {
4408                         /* Reset grants */
4409                         cli = &obd->u.cli;
4410                         client_obd_list_lock(&cli->cl_loi_list_lock);
4411                         /* all pages go to failing rpcs due to the invalid
4412                          * import */
4413                         osc_check_rpcs(env, cli);
4414                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4415
4416                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4417                         cl_env_put(env, &refcheck);
4418                 } else
4419                         rc = PTR_ERR(env);
4420                 break;
4421         }
4422         case IMP_EVENT_ACTIVE: {
4423                 /* Only do this on the MDS OSC's */
4424                 if (imp->imp_server_timeout) {
4425                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4426
4427                         cfs_spin_lock(&oscc->oscc_lock);
4428                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4429                         cfs_spin_unlock(&oscc->oscc_lock);
4430                 }
4431                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4432                 break;
4433         }
4434         case IMP_EVENT_OCD: {
4435                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4436
4437                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4438                         osc_init_grant(&obd->u.cli, ocd);
4439
4440                 /* See bug 7198 */
4441                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4442                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4443
4444                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4445                 break;
4446         }
4447         default:
4448                 CERROR("Unknown import event %d\n", event);
4449                 LBUG();
4450         }
4451         RETURN(rc);
4452 }
4453
4454 /**
4455  * Determine whether the lock can be canceled before replaying the lock
4456  * during recovery, see bug16774 for detailed information.
4457  *
4458  * \retval zero the lock can't be canceled
4459  * \retval other ok to cancel
4460  */
4461 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4462 {
4463         check_res_locked(lock->l_resource);
4464
4465         /*
4466          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4467          *
4468          * XXX as a future improvement, we can also cancel unused write lock
4469          * if it doesn't have dirty data and active mmaps.
4470          */
4471         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4472             (lock->l_granted_mode == LCK_PR ||
4473              lock->l_granted_mode == LCK_CR) &&
4474             (osc_dlm_lock_pageref(lock) == 0))
4475                 RETURN(1);
4476
4477         RETURN(0);
4478 }
4479
4480 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4481 {
4482         int rc;
4483         ENTRY;
4484
4485         ENTRY;
4486         rc = ptlrpcd_addref();
4487         if (rc)
4488                 RETURN(rc);
4489
4490         rc = client_obd_setup(obd, lcfg);
4491         if (rc) {
4492                 ptlrpcd_decref();
4493         } else {
4494                 struct lprocfs_static_vars lvars = { 0 };
4495                 struct client_obd *cli = &obd->u.cli;
4496
4497                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4498                 lprocfs_osc_init_vars(&lvars);
4499                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4500                         lproc_osc_attach_seqstat(obd);
4501                         sptlrpc_lprocfs_cliobd_attach(obd);
4502                         ptlrpc_lprocfs_register_obd(obd);
4503                 }
4504
4505                 oscc_init(obd);
4506                 /* We need to allocate a few requests more, because
4507                    brw_interpret tries to create new requests before freeing
4508                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4509                    reserved, but I afraid that might be too much wasted RAM
4510                    in fact, so 2 is just my guess and still should work. */
4511                 cli->cl_import->imp_rq_pool =
4512                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4513                                             OST_MAXREQSIZE,
4514                                             ptlrpc_add_rqs_to_pool);
4515
4516                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4517                 cfs_sema_init(&cli->cl_grant_sem, 1);
4518
4519                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4520         }
4521
4522         RETURN(rc);
4523 }
4524
4525 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4526 {
4527         int rc = 0;
4528         ENTRY;
4529
4530         switch (stage) {
4531         case OBD_CLEANUP_EARLY: {
4532                 struct obd_import *imp;
4533                 imp = obd->u.cli.cl_import;
4534                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4535                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4536                 ptlrpc_deactivate_import(imp);
4537                 cfs_spin_lock(&imp->imp_lock);
4538                 imp->imp_pingable = 0;
4539                 cfs_spin_unlock(&imp->imp_lock);
4540                 break;
4541         }
4542         case OBD_CLEANUP_EXPORTS: {
4543                 /* If we set up but never connected, the
4544                    client import will not have been cleaned. */
4545                 if (obd->u.cli.cl_import) {
4546                         struct obd_import *imp;
4547                         cfs_down_write(&obd->u.cli.cl_sem);
4548                         imp = obd->u.cli.cl_import;
4549                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4550                                obd->obd_name);
4551                         ptlrpc_invalidate_import(imp);
4552                         if (imp->imp_rq_pool) {
4553                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4554                                 imp->imp_rq_pool = NULL;
4555                         }
4556                         class_destroy_import(imp);
4557                         cfs_up_write(&obd->u.cli.cl_sem);
4558                         obd->u.cli.cl_import = NULL;
4559                 }
4560                 rc = obd_llog_finish(obd, 0);
4561                 if (rc != 0)
4562                         CERROR("failed to cleanup llogging subsystems\n");
4563                 break;
4564                 }
4565         }
4566         RETURN(rc);
4567 }
4568
4569 int osc_cleanup(struct obd_device *obd)
4570 {
4571         int rc;
4572
4573         ENTRY;
4574         ptlrpc_lprocfs_unregister_obd(obd);
4575         lprocfs_obd_cleanup(obd);
4576
4577         /* free memory of osc quota cache */
4578         lquota_cleanup(quota_interface, obd);
4579
4580         rc = client_obd_cleanup(obd);
4581
4582         ptlrpcd_decref();
4583         RETURN(rc);
4584 }
4585
4586 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4587 {
4588         struct lprocfs_static_vars lvars = { 0 };
4589         int rc = 0;
4590
4591         lprocfs_osc_init_vars(&lvars);
4592
4593         switch (lcfg->lcfg_command) {
4594         default:
4595                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4596                                               lcfg, obd);
4597                 if (rc > 0)
4598                         rc = 0;
4599                 break;
4600         }
4601
4602         return(rc);
4603 }
4604
4605 static int osc_sync_fs(struct obd_export *exp, struct obd_info *oinfo,
4606                        int wait)
4607 {
4608         struct obd_device *obd = class_exp2obd(exp);
4609         struct client_obd *cli;
4610         struct lov_oinfo *loi;
4611         struct lov_oinfo *tloi;
4612         struct osc_async_page *oap;
4613         struct osc_async_page *toap;
4614         struct loi_oap_pages *lop;
4615         struct lu_env *env;
4616         int refcheck;
4617         int rc = 0;
4618         ENTRY;
4619
4620         env = cl_env_get(&refcheck);
4621         if (IS_ERR(env))
4622                 RETURN(PTR_ERR(env));
4623
4624         cli = &obd->u.cli;
4625         client_obd_list_lock(&cli->cl_loi_list_lock);
4626         cli->cl_sf_wait.sfw_oi = oinfo;
4627         cli->cl_sf_wait.sfw_upcall = oinfo->oi_cb_up;
4628         cli->cl_sf_wait.started = 1;
4629         /* creating cl_loi_sync_fs list */
4630         cfs_list_for_each_entry_safe(loi, tloi, &cli->cl_loi_write_list,
4631                                      loi_write_item) {
4632                 lop = &loi->loi_write_lop;
4633                 cfs_list_for_each_entry_safe(oap, toap, &lop->lop_pending,
4634                                              oap_pending_item)
4635                         osc_set_async_flags_base(cli, loi, oap, ASYNC_SYNCFS);
4636         }
4637         osc_check_rpcs(env, cli);
4638         osc_wake_sync_fs(cli);
4639         client_obd_list_unlock(&cli->cl_loi_list_lock);
4640         cl_env_put(env, &refcheck);
4641
4642         RETURN(rc);
4643 }
4644
4645 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4646 {
4647         return osc_process_config_base(obd, buf);
4648 }
4649
4650 struct obd_ops osc_obd_ops = {
4651         .o_owner                = THIS_MODULE,
4652         .o_setup                = osc_setup,
4653         .o_precleanup           = osc_precleanup,
4654         .o_cleanup              = osc_cleanup,
4655         .o_add_conn             = client_import_add_conn,
4656         .o_del_conn             = client_import_del_conn,
4657         .o_connect              = client_connect_import,
4658         .o_reconnect            = osc_reconnect,
4659         .o_disconnect           = osc_disconnect,
4660         .o_statfs               = osc_statfs,
4661         .o_statfs_async         = osc_statfs_async,
4662         .o_packmd               = osc_packmd,
4663         .o_unpackmd             = osc_unpackmd,
4664         .o_precreate            = osc_precreate,
4665         .o_create               = osc_create,
4666         .o_create_async         = osc_create_async,
4667         .o_destroy              = osc_destroy,
4668         .o_getattr              = osc_getattr,
4669         .o_getattr_async        = osc_getattr_async,
4670         .o_setattr              = osc_setattr,
4671         .o_setattr_async        = osc_setattr_async,
4672         .o_brw                  = osc_brw,
4673         .o_punch                = osc_punch,
4674         .o_sync                 = osc_sync,
4675         .o_enqueue              = osc_enqueue,
4676         .o_change_cbdata        = osc_change_cbdata,
4677         .o_find_cbdata          = osc_find_cbdata,
4678         .o_cancel               = osc_cancel,
4679         .o_cancel_unused        = osc_cancel_unused,
4680         .o_iocontrol            = osc_iocontrol,
4681         .o_get_info             = osc_get_info,
4682         .o_set_info_async       = osc_set_info_async,
4683         .o_import_event         = osc_import_event,
4684         .o_llog_init            = osc_llog_init,
4685         .o_llog_finish          = osc_llog_finish,
4686         .o_process_config       = osc_process_config,
4687         .o_sync_fs              = osc_sync_fs,
4688 };
4689
4690 extern struct lu_kmem_descr osc_caches[];
4691 extern cfs_spinlock_t       osc_ast_guard;
4692 extern cfs_lock_class_key_t osc_ast_guard_class;
4693
4694 int __init osc_init(void)
4695 {
4696         struct lprocfs_static_vars lvars = { 0 };
4697         int rc;
4698         ENTRY;
4699
4700         /* print an address of _any_ initialized kernel symbol from this
4701          * module, to allow debugging with gdb that doesn't support data
4702          * symbols from modules.*/
4703         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4704
4705         rc = lu_kmem_init(osc_caches);
4706
4707         lprocfs_osc_init_vars(&lvars);
4708
4709         cfs_request_module("lquota");
4710         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4711         lquota_init(quota_interface);
4712         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4713
4714         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4715                                  LUSTRE_OSC_NAME, &osc_device_type);
4716         if (rc) {
4717                 if (quota_interface)
4718                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4719                 lu_kmem_fini(osc_caches);
4720                 RETURN(rc);
4721         }
4722
4723         cfs_spin_lock_init(&osc_ast_guard);
4724         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4725
4726         osc_mds_ost_orig_logops = llog_lvfs_ops;
4727         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4728         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4729         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4730         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4731
4732         RETURN(rc);
4733 }
4734
4735 #ifdef __KERNEL__
4736 static void /*__exit*/ osc_exit(void)
4737 {
4738         lu_device_type_fini(&osc_device_type);
4739
4740         lquota_exit(quota_interface);
4741         if (quota_interface)
4742                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4743
4744         class_unregister_type(LUSTRE_OSC_NAME);
4745         lu_kmem_fini(osc_caches);
4746 }
4747
4748 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4749 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4750 MODULE_LICENSE("GPL");
4751
4752 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4753 #endif