Whamcloud - gitweb
LU-4017 quota: add project id support
[fs/lustre-release.git] / lustre / osp / osp_sync.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osp/osp_sync.c
33  *
34  * Lustre OST Proxy Device
35  *
36  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37  * Author: Mikhail Pershin <mike.pershin@intel.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_MDS
41
42 #include <linux/kthread.h>
43 #include <lustre_log.h>
44 #include <lustre_update.h>
45 #include "osp_internal.h"
46
47 static int osp_sync_id_traction_init(struct osp_device *d);
48 static void osp_sync_id_traction_fini(struct osp_device *d);
49 static __u64 osp_sync_id_get(struct osp_device *d, __u64 id);
50 static void osp_sync_remove_from_tracker(struct osp_device *d);
51
52 /*
53  * this is a components of OSP implementing synchronization between MDS and OST
54  * it llogs all interesting changes (currently it's uig/gid change and object
55  * destroy) atomically, then makes sure changes hit OST storage
56  *
57  * we have 4 queues of work:
58  *
59  * the first queue is llog itself, once read a change is stored in 2nd queue
60  * in form of RPC (but RPC isn't fired yet).
61  *
62  * the second queue (opd_syn_waiting_for_commit) holds changes awaiting local
63  * commit. once change is committed locally it migrates onto 3rd queue.
64  *
65  * the third queue (opd_syn_committed_here) holds changes committed locally,
66  * but not sent to OST (as the pipe can be full). once pipe becomes non-full
67  * we take a change from the queue and fire corresponded RPC.
68  *
69  * once RPC is reported committed by OST (using regular last_committed mech.)
70  * the change jumps into 4th queue (opd_syn_committed_there), now we can
71  * cancel corresponded llog record and release RPC
72  *
73  * opd_syn_changes is a number of unread llog records (to be processed).
74  * notice this number doesn't include llog records from previous boots.
75  * with OSP_SYN_THRESHOLD we try to batch processing a bit (TO BE IMPLEMENTED)
76  *
77  * opd_syn_rpc_in_progress is a number of requests in 2-4 queues.
78  * we control this with OSP_MAX_IN_PROGRESS so that OSP don't consume
79  * too much memory -- how to deal with 1000th OSTs ? batching could help?
80  *
81  * opd_syn_rpc_in_flight is a number of RPC in flight.
82  * we control this with OSP_MAX_IN_FLIGHT
83  */
84
85 /* XXX: do math to learn reasonable threshold
86  * should it be ~ number of changes fitting bulk? */
87
88 #define OSP_SYN_THRESHOLD       10
89 #define OSP_MAX_IN_FLIGHT       8
90 #define OSP_MAX_IN_PROGRESS     4096
91
92 #define OSP_JOB_MAGIC           0x26112005
93
94 struct osp_job_req_args {
95         /** bytes reserved for ptlrpc_replay_req() */
96         struct ptlrpc_replay_async_args jra_raa;
97         struct list_head                jra_committed_link;
98         struct list_head                jra_inflight_link;
99         struct llog_cookie              jra_lcookie;
100         __u32                           jra_magic;
101 };
102
103 static inline int osp_sync_running(struct osp_device *d)
104 {
105         return !!(d->opd_syn_thread.t_flags & SVC_RUNNING);
106 }
107
108 /**
109  * Check status: whether OSP thread has stopped
110  *
111  * \param[in] d         OSP device
112  *
113  * \retval 0            still running
114  * \retval 1            stopped
115  */
116 static inline int osp_sync_stopped(struct osp_device *d)
117 {
118         return !!(d->opd_syn_thread.t_flags & SVC_STOPPED);
119 }
120
121 /*
122  ** Check for new changes to sync
123  *
124  * \param[in] d         OSP device
125  *
126  * \retval 1            there are changes
127  * \retval 0            there are no changes
128  */
129 static inline int osp_sync_has_new_job(struct osp_device *d)
130 {
131         return ((d->opd_syn_last_processed_id < d->opd_syn_last_used_id) &&
132                 (d->opd_syn_last_processed_id < d->opd_syn_last_committed_id))
133                 || (d->opd_syn_prev_done == 0);
134 }
135
136 static inline int osp_sync_inflight_conflict(struct osp_device *d,
137                                              struct llog_rec_hdr *h)
138 {
139         struct osp_job_req_args *jra;
140         struct ost_id            ostid;
141         int                      conflict = 0;
142
143         if (h == NULL || h->lrh_type == LLOG_GEN_REC ||
144             list_empty(&d->opd_syn_inflight_list))
145                 return conflict;
146
147         memset(&ostid, 0, sizeof(ostid));
148         switch (h->lrh_type) {
149         case MDS_UNLINK_REC:
150                 ostid_set_seq(&ostid, ((struct llog_unlink_rec *)h)->lur_oseq);
151                 ostid_set_id(&ostid, ((struct llog_unlink_rec *)h)->lur_oid);
152                 break;
153         case MDS_UNLINK64_REC:
154                 fid_to_ostid(&((struct llog_unlink64_rec *)h)->lur_fid, &ostid);
155                 break;
156         case MDS_SETATTR64_REC:
157                 ostid = ((struct llog_setattr64_rec *)h)->lsr_oi;
158                 break;
159         default:
160                 LBUG();
161         }
162
163         spin_lock(&d->opd_syn_lock);
164         list_for_each_entry(jra, &d->opd_syn_inflight_list, jra_inflight_link) {
165                 struct ptlrpc_request   *req;
166                 struct ost_body         *body;
167
168                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
169
170                 req = container_of((void *)jra, struct ptlrpc_request,
171                                    rq_async_args);
172                 body = req_capsule_client_get(&req->rq_pill,
173                                               &RMF_OST_BODY);
174                 LASSERT(body);
175
176                 if (memcmp(&ostid, &body->oa.o_oi, sizeof(ostid)) == 0) {
177                         conflict = 1;
178                         break;
179                 }
180         }
181         spin_unlock(&d->opd_syn_lock);
182
183         return conflict;
184 }
185
186 static inline int osp_sync_low_in_progress(struct osp_device *d)
187 {
188         return atomic_read(&d->opd_syn_rpc_in_progress) <
189                 d->opd_syn_max_rpc_in_progress;
190 }
191
192 /**
193  * Check for room in the network pipe to OST
194  *
195  * \param[in] d         OSP device
196  *
197  * \retval 1            there is room
198  * \retval 0            no room, the pipe is full
199  */
200 static inline int osp_sync_low_in_flight(struct osp_device *d)
201 {
202         return atomic_read(&d->opd_syn_rpc_in_flight) <
203                 d->opd_syn_max_rpc_in_flight;
204 }
205
206 /**
207  * Wake up check for the main sync thread
208  *
209  * \param[in] d         OSP device
210  *
211  * \retval 1            time to wake up
212  * \retval 0            no need to wake up
213  */
214 static inline int osp_sync_has_work(struct osp_device *d)
215 {
216         /* has new/old changes and low in-progress? */
217         if (osp_sync_has_new_job(d) && osp_sync_low_in_progress(d) &&
218             osp_sync_low_in_flight(d) && d->opd_imp_connected)
219                 return 1;
220
221         /* has remotely committed? */
222         if (!list_empty(&d->opd_syn_committed_there))
223                 return 1;
224
225         return 0;
226 }
227
228 #define osp_sync_check_for_work(d)                      \
229 {                                                       \
230         if (osp_sync_has_work(d)) {                     \
231                 wake_up(&d->opd_syn_waitq);    \
232         }                                               \
233 }
234
235 void __osp_sync_check_for_work(struct osp_device *d)
236 {
237         osp_sync_check_for_work(d);
238 }
239
240 static inline __u64 osp_sync_correct_id(struct osp_device *d,
241                                         struct llog_rec_hdr *rec)
242 {
243         /*
244          * llog use cyclic store with 32 bit lrh_id
245          * so overflow lrh_id is possible. Range between
246          * last_processed and last_committed is less than
247          * 64745 ^ 2 and less than 2^32 - 1
248          */
249         __u64 correct_id = d->opd_syn_last_committed_id;
250
251         if ((correct_id & 0xffffffffULL) < rec->lrh_id)
252                 correct_id -= 0x100000000ULL;
253
254         correct_id &= ~0xffffffffULL;
255         correct_id |= rec->lrh_id;
256
257         return correct_id;
258 }
259 /**
260  * Check and return ready-for-new status.
261  *
262  * The thread processing llog record uses this function to check whether
263  * it's time to take another record and process it. The number of conditions
264  * must be met: the connection should be ready, RPCs in flight not exceeding
265  * the limit, the record is committed locally, etc (see the lines below).
266  *
267  * \param[in] d         OSP device
268  * \param[in] rec       next llog record to process
269  *
270  * \retval 0            not ready
271  * \retval 1            ready
272  */
273 static inline int osp_sync_can_process_new(struct osp_device *d,
274                                            struct llog_rec_hdr *rec)
275 {
276         LASSERT(d);
277
278         if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
279                 return 0;
280         if (unlikely(osp_sync_inflight_conflict(d, rec)))
281                 return 0;
282         if (!osp_sync_low_in_progress(d))
283                 return 0;
284         if (!osp_sync_low_in_flight(d))
285                 return 0;
286         if (!d->opd_imp_connected)
287                 return 0;
288         if (d->opd_syn_prev_done == 0)
289                 return 1;
290         if (atomic_read(&d->opd_syn_changes) == 0)
291                 return 0;
292         if (rec == NULL ||
293             osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id)
294                 return 1;
295         return 0;
296 }
297
298 /**
299  * Declare intention to add a new change.
300  *
301  * With regard to OSD API, we have to declare any changes ahead. In this
302  * case we declare an intention to add a llog record representing the
303  * change on the local storage.
304  *
305  * \param[in] env       LU environment provided by the caller
306  * \param[in] o         OSP object
307  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
308  * \param[in] th        transaction handle (local)
309  *
310  * \retval 0            on success
311  * \retval negative     negated errno on error
312  */
313 int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
314                          llog_op_type type, struct thandle *th)
315 {
316         struct osp_thread_info  *osi = osp_env_info(env);
317         struct osp_device       *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
318         struct llog_ctxt        *ctxt;
319         struct thandle          *storage_th;
320         int                      rc;
321
322         ENTRY;
323
324         /* it's a layering violation, to access internals of th,
325          * but we can do this as a sanity check, for a while */
326         LASSERT(th->th_top != NULL);
327         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
328         if (IS_ERR(storage_th))
329                 RETURN(PTR_ERR(storage_th));
330
331         switch (type) {
332         case MDS_UNLINK64_REC:
333                 osi->osi_hdr.lrh_len = sizeof(struct llog_unlink64_rec);
334                 break;
335         case MDS_SETATTR64_REC:
336                 osi->osi_hdr.lrh_len = sizeof(struct llog_setattr64_rec_v2);
337                 break;
338         default:
339                 LBUG();
340         }
341
342         /* we want ->dt_trans_start() to allocate per-thandle structure */
343         storage_th->th_tags |= LCT_OSP_THREAD;
344
345         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
346         LASSERT(ctxt);
347
348         rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
349                               storage_th);
350         llog_ctxt_put(ctxt);
351
352         RETURN(rc);
353 }
354
355 /**
356  * Generate a llog record for a given change.
357  *
358  * Generates a llog record for the change passed. The change can be of two
359  * types: unlink and setattr. The record gets an ID which later will be
360  * used to track commit status of the change. For unlink changes, the caller
361  * can supply a starting FID and the count of the objects to destroy. For
362  * setattr the caller should apply attributes to apply.
363  *
364  *
365  * \param[in] env       LU environment provided by the caller
366  * \param[in] d         OSP device
367  * \param[in] fid       fid of the object the change should be applied to
368  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
369  * \param[in] count     count of objects to destroy
370  * \param[in] th        transaction handle (local)
371  * \param[in] attr      attributes for setattr
372  *
373  * \retval 0            on success
374  * \retval negative     negated errno on error
375  */
376 static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
377                             const struct lu_fid *fid, llog_op_type type,
378                             int count, struct thandle *th,
379                             const struct lu_attr *attr)
380 {
381         struct osp_thread_info  *osi = osp_env_info(env);
382         struct llog_ctxt        *ctxt;
383         struct osp_txn_info     *txn;
384         struct thandle          *storage_th;
385         int                      rc;
386
387         ENTRY;
388
389         /* it's a layering violation, to access internals of th,
390          * but we can do this as a sanity check, for a while */
391         LASSERT(th->th_top != NULL);
392         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
393         if (IS_ERR(storage_th))
394                 RETURN(PTR_ERR(storage_th));
395
396         switch (type) {
397         case MDS_UNLINK64_REC:
398                 osi->osi_hdr.lrh_len = sizeof(osi->osi_unlink);
399                 osi->osi_hdr.lrh_type = MDS_UNLINK64_REC;
400                 osi->osi_unlink.lur_fid  = *fid;
401                 osi->osi_unlink.lur_count = count;
402                 break;
403         case MDS_SETATTR64_REC:
404                 rc = fid_to_ostid(fid, &osi->osi_oi);
405                 LASSERT(rc == 0);
406                 osi->osi_hdr.lrh_len = sizeof(osi->osi_setattr);
407                 osi->osi_hdr.lrh_type = MDS_SETATTR64_REC;
408                 osi->osi_setattr.lsr_oi  = osi->osi_oi;
409                 LASSERT(attr);
410                 osi->osi_setattr.lsr_uid = attr->la_uid;
411                 osi->osi_setattr.lsr_gid = attr->la_gid;
412                 osi->osi_setattr.lsr_projid = attr->la_projid;
413                 osi->osi_setattr.lsr_valid =
414                         ((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
415                         ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) |
416                         ((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0);
417                 break;
418         default:
419                 LBUG();
420         }
421
422         txn = osp_txn_info(&storage_th->th_ctx);
423         LASSERT(txn);
424
425         txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
426         osi->osi_hdr.lrh_id = (txn->oti_current_id & 0xffffffffULL);
427         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
428         if (ctxt == NULL)
429                 RETURN(-ENOMEM);
430
431         rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
432                       storage_th);
433         llog_ctxt_put(ctxt);
434
435         if (likely(rc >= 0)) {
436                 CDEBUG(D_OTHER, "%s: new record "DOSTID":%lu/%lu: %d\n",
437                        d->opd_obd->obd_name,
438                        POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi),
439                        (unsigned long)osi->osi_cookie.lgc_lgl.lgl_ogen,
440                        (unsigned long)osi->osi_cookie.lgc_index, rc);
441                 atomic_inc(&d->opd_syn_changes);
442         }
443         /* return 0 always here, error case just cause no llog record */
444         RETURN(0);
445 }
446
447 int osp_sync_add(const struct lu_env *env, struct osp_object *o,
448                  llog_op_type type, struct thandle *th,
449                  const struct lu_attr *attr)
450 {
451         return osp_sync_add_rec(env, lu2osp_dev(o->opo_obj.do_lu.lo_dev),
452                                 lu_object_fid(&o->opo_obj.do_lu), type, 1,
453                                 th, attr);
454 }
455
456 int osp_sync_gap(const struct lu_env *env, struct osp_device *d,
457                         struct lu_fid *fid, int lost, struct thandle *th)
458 {
459         return osp_sync_add_rec(env, d, fid, MDS_UNLINK64_REC, lost, th, NULL);
460 }
461
462 /*
463  * it's quite obvious we can't maintain all the structures in the memory:
464  * while OST is down, MDS can be processing thousands and thousands of unlinks
465  * filling persistent llogs and in-core respresentation
466  *
467  * this doesn't scale at all. so we need basically the following:
468  * a) destroy/setattr append llog records
469  * b) once llog has grown to X records, we process first Y committed records
470  *
471  *  once record R is found via llog_process(), it becomes committed after any
472  *  subsequent commit callback (at the most)
473  */
474
475 /**
476  * ptlrpc commit callback.
477  *
478  * The callback is called by PTLRPC when a RPC is reported committed by the
479  * target (OST). We register the callback for the every RPC applying a change
480  * from the llog. This way we know then the llog records can be cancelled.
481  * Notice the callback can be called when OSP is finishing. We can detect this
482  * checking that actual transno in the request is less or equal of known
483  * committed transno (see osp_sync_process_committed() for the details).
484  * XXX: this is pretty expensive and can be improved later using batching.
485  *
486  * \param[in] req       request
487  */
488 static void osp_sync_request_commit_cb(struct ptlrpc_request *req)
489 {
490         struct osp_device *d = req->rq_cb_data;
491         struct osp_job_req_args *jra;
492
493         CDEBUG(D_HA, "commit req %p, transno %llu\n", req, req->rq_transno);
494
495         if (unlikely(req->rq_transno == 0))
496                 return;
497
498         /* do not do any opd_dyn_rpc_* accounting here
499          * it's done in osp_sync_interpret sooner or later */
500         LASSERT(d);
501
502         jra = ptlrpc_req_async_args(req);
503         LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
504         LASSERT(list_empty(&jra->jra_committed_link));
505
506         ptlrpc_request_addref(req);
507
508         spin_lock(&d->opd_syn_lock);
509         list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
510         spin_unlock(&d->opd_syn_lock);
511
512         /* XXX: some batching wouldn't hurt */
513         wake_up(&d->opd_syn_waitq);
514 }
515
516 /**
517  * RPC interpretation callback.
518  *
519  * The callback is called by ptlrpc when RPC is replied. Now we have to decide
520  * whether we should:
521  *  - put request on a special list to wait until it's committed by the target,
522  *    if the request is successful
523  *  - schedule llog record cancel if no target object is found
524  *  - try later (essentially after reboot) in case of unexpected error
525  *
526  * \param[in] env       LU environment provided by the caller
527  * \param[in] req       request replied
528  * \param[in] aa        callback data
529  * \param[in] rc        result of RPC
530  *
531  * \retval 0            always
532  */
533 static int osp_sync_interpret(const struct lu_env *env,
534                               struct ptlrpc_request *req, void *aa, int rc)
535 {
536         struct osp_device *d = req->rq_cb_data;
537         struct osp_job_req_args *jra = aa;
538
539         if (jra->jra_magic != OSP_JOB_MAGIC) {
540                 DEBUG_REQ(D_ERROR, req, "bad magic %u\n", jra->jra_magic);
541                 LBUG();
542         }
543         LASSERT(d);
544
545         CDEBUG(D_HA, "reply req %p/%d, rc %d, transno %u\n", req,
546                atomic_read(&req->rq_refcount),
547                rc, (unsigned) req->rq_transno);
548         LASSERT(rc || req->rq_transno);
549
550         if (rc == -ENOENT) {
551                 /*
552                  * we tried to destroy object or update attributes,
553                  * but object doesn't exist anymore - cancell llog record
554                  */
555                 LASSERT(req->rq_transno == 0);
556                 LASSERT(list_empty(&jra->jra_committed_link));
557
558                 ptlrpc_request_addref(req);
559
560                 spin_lock(&d->opd_syn_lock);
561                 list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
562                 spin_unlock(&d->opd_syn_lock);
563
564                 wake_up(&d->opd_syn_waitq);
565         } else if (rc) {
566                 struct obd_import *imp = req->rq_import;
567                 /*
568                  * error happened, we'll try to repeat on next boot ?
569                  */
570                 LASSERTF(req->rq_transno == 0 ||
571                          req->rq_import_generation < imp->imp_generation,
572                          "transno %llu, rc %d, gen: req %d, imp %d\n",
573                          req->rq_transno, rc, req->rq_import_generation,
574                          imp->imp_generation);
575                 if (req->rq_transno == 0) {
576                         /* this is the last time we see the request
577                          * if transno is not zero, then commit cb
578                          * will be called at some point */
579                         LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) > 0);
580                         atomic_dec(&d->opd_syn_rpc_in_progress);
581                 }
582
583                 wake_up(&d->opd_syn_waitq);
584         } else if (d->opd_pre != NULL &&
585                    unlikely(d->opd_pre_status == -ENOSPC)) {
586                 /*
587                  * if current status is -ENOSPC (lack of free space on OST)
588                  * then we should poll OST immediately once object destroy
589                  * is replied
590                  */
591                 osp_statfs_need_now(d);
592         }
593
594         spin_lock(&d->opd_syn_lock);
595         list_del_init(&jra->jra_inflight_link);
596         spin_unlock(&d->opd_syn_lock);
597         LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) > 0);
598         atomic_dec(&d->opd_syn_rpc_in_flight);
599         if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
600                 wake_up(&d->opd_syn_barrier_waitq);
601         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
602                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
603                atomic_read(&d->opd_syn_rpc_in_progress));
604
605         osp_sync_check_for_work(d);
606
607         return 0;
608 }
609
610 /*
611  ** Add request to ptlrpc queue.
612  *
613  * This is just a tiny helper function to put the request on the sending list
614  *
615  * \param[in] d         OSP device
616  * \param[in] llh       llog handle where the record is stored
617  * \param[in] h         llog record
618  * \param[in] req       request
619  */
620 static void osp_sync_send_new_rpc(struct osp_device *d,
621                                   struct llog_handle *llh,
622                                   struct llog_rec_hdr *h,
623                                   struct ptlrpc_request *req)
624 {
625         struct osp_job_req_args *jra;
626
627         LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) <=
628                 d->opd_syn_max_rpc_in_flight);
629
630         jra = ptlrpc_req_async_args(req);
631         jra->jra_magic = OSP_JOB_MAGIC;
632         jra->jra_lcookie.lgc_lgl = llh->lgh_id;
633         jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
634         jra->jra_lcookie.lgc_index = h->lrh_index;
635         INIT_LIST_HEAD(&jra->jra_committed_link);
636         spin_lock(&d->opd_syn_lock);
637         list_add_tail(&jra->jra_inflight_link, &d->opd_syn_inflight_list);
638         spin_unlock(&d->opd_syn_lock);
639
640         ptlrpcd_add_req(req);
641 }
642
643
644 /**
645  * Allocate and prepare RPC for a new change.
646  *
647  * The function allocates and initializes an RPC which will be sent soon to
648  * apply the change to the target OST. The request is initialized from the
649  * llog record passed. Notice only the fields common to all type of changes
650  * are initialized.
651  *
652  * \param[in] d         OSP device
653  * \param[in] op        type of the change
654  * \param[in] format    request format to be used
655  *
656  * \retval pointer              new request on success
657  * \retval ERR_PTR(errno)       on error
658  */
659 static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
660                                                ost_cmd_t op,
661                                                const struct req_format *format)
662 {
663         struct ptlrpc_request   *req;
664         struct obd_import       *imp;
665         int                      rc;
666
667         /* Prepare the request */
668         imp = d->opd_obd->u.cli.cl_import;
669         LASSERT(imp);
670
671         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_ENOMEM))
672                 RETURN(ERR_PTR(-ENOMEM));
673
674         req = ptlrpc_request_alloc(imp, format);
675         if (req == NULL)
676                 RETURN(ERR_PTR(-ENOMEM));
677
678         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, op);
679         if (rc) {
680                 ptlrpc_req_finished(req);
681                 return ERR_PTR(rc);
682         }
683
684         req->rq_interpret_reply = osp_sync_interpret;
685         req->rq_commit_cb = osp_sync_request_commit_cb;
686         req->rq_cb_data = d;
687
688         ptlrpc_request_set_replen(req);
689
690         return req;
691 }
692
693 /**
694  * Generate a request for setattr change.
695  *
696  * The function prepares a new RPC, initializes it with setattr specific
697  * bits and send the RPC.
698  *
699  * \param[in] d         OSP device
700  * \param[in] llh       llog handle where the record is stored
701  * \param[in] h         llog record
702  *
703  * \retval 0            on success
704  * \retval 1            on invalid record
705  * \retval negative     negated errno on error
706  */
707 static int osp_sync_new_setattr_job(struct osp_device *d,
708                                     struct llog_handle *llh,
709                                     struct llog_rec_hdr *h)
710 {
711         struct llog_setattr64_rec       *rec = (struct llog_setattr64_rec *)h;
712         struct ptlrpc_request           *req;
713         struct ost_body                 *body;
714
715         ENTRY;
716         LASSERT(h->lrh_type == MDS_SETATTR64_REC);
717
718         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_INVALID_REC))
719                 RETURN(1);
720
721         /* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set,
722          * so no bits other than these should be set. */
723         if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID |
724             OBD_MD_FLPROJID)) != 0) {
725                 CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
726                         d->opd_obd->obd_name, rec->lsr_valid);
727                 /* return 1 on invalid record */
728                 RETURN(1);
729         }
730
731         req = osp_sync_new_job(d, OST_SETATTR, &RQF_OST_SETATTR);
732         if (IS_ERR(req))
733                 RETURN(PTR_ERR(req));
734
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         body->oa.o_oi = rec->lsr_oi;
738         body->oa.o_uid = rec->lsr_uid;
739         body->oa.o_gid = rec->lsr_gid;
740         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
741         if (h->lrh_len > sizeof(struct llog_setattr64_rec))
742                 body->oa.o_projid = ((struct llog_setattr64_rec_v2 *)
743                                       rec)->lsr_projid;
744
745         /* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
746          * we assume that both UID and GID are valid in that case. */
747         if (rec->lsr_valid == 0)
748                 body->oa.o_valid |= (OBD_MD_FLUID | OBD_MD_FLGID);
749         else
750                 body->oa.o_valid |= rec->lsr_valid;
751
752         osp_sync_send_new_rpc(d, llh, h, req);
753         RETURN(0);
754 }
755
756 /**
757  * Generate a request for unlink change.
758  *
759  * The function prepares a new RPC, initializes it with unlink(destroy)
760  * specific bits and sends the RPC. The function is used to handle
761  * llog_unlink_rec which were used in the older versions of Lustre.
762  * Current version uses llog_unlink_rec64.
763  *
764  * \param[in] d         OSP device
765  * \param[in] llh       llog handle where the record is stored
766  * \param[in] h         llog record
767  *
768  * \retval 0            on success
769  * \retval negative     negated errno on error
770  */
771 static int osp_sync_new_unlink_job(struct osp_device *d,
772                                    struct llog_handle *llh,
773                                    struct llog_rec_hdr *h)
774 {
775         struct llog_unlink_rec  *rec = (struct llog_unlink_rec *)h;
776         struct ptlrpc_request   *req;
777         struct ost_body         *body;
778
779         ENTRY;
780         LASSERT(h->lrh_type == MDS_UNLINK_REC);
781
782         req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
783         if (IS_ERR(req))
784                 RETURN(PTR_ERR(req));
785
786         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
787         LASSERT(body);
788         ostid_set_seq(&body->oa.o_oi, rec->lur_oseq);
789         ostid_set_id(&body->oa.o_oi, rec->lur_oid);
790         body->oa.o_misc = rec->lur_count;
791         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
792         if (rec->lur_count)
793                 body->oa.o_valid |= OBD_MD_FLOBJCOUNT;
794
795         osp_sync_send_new_rpc(d, llh, h, req);
796         RETURN(0);
797 }
798
799 /**
800  * Generate a request for unlink change.
801  *
802  * The function prepares a new RPC, initializes it with unlink(destroy)
803  * specific bits and sends the RPC. Depending on the target (MDT or OST)
804  * two different protocols are used. For MDT we use OUT (basically OSD API
805  * updates transferred via a network). For OST we still use the old
806  * protocol (OBD?), originally for compatibility. Later we can start to
807  * use OUT for OST as well, this will allow batching and better code
808  * unification.
809  *
810  * \param[in] d         OSP device
811  * \param[in] llh       llog handle where the record is stored
812  * \param[in] h         llog record
813  *
814  * \retval 0            on success
815  * \retval negative     negated errno on error
816  */
817 static int osp_sync_new_unlink64_job(struct osp_device *d,
818                                      struct llog_handle *llh,
819                                      struct llog_rec_hdr *h)
820 {
821         struct llog_unlink64_rec        *rec = (struct llog_unlink64_rec *)h;
822         struct ptlrpc_request           *req = NULL;
823         struct ost_body                 *body;
824         int                              rc;
825
826         ENTRY;
827         LASSERT(h->lrh_type == MDS_UNLINK64_REC);
828         req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
829         if (IS_ERR(req))
830                 RETURN(PTR_ERR(req));
831
832         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
833         if (body == NULL)
834                 RETURN(-EFAULT);
835         rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
836         if (rc < 0)
837                 RETURN(rc);
838         body->oa.o_misc = rec->lur_count;
839         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
840                            OBD_MD_FLOBJCOUNT;
841         osp_sync_send_new_rpc(d, llh, h, req);
842         RETURN(0);
843 }
844
845 /**
846  * Process llog records.
847  *
848  * This function is called to process the llog records committed locally.
849  * In the recovery model used by OSP we can apply a change to a remote
850  * target once corresponding transaction (like posix unlink) is committed
851  * locally so can't revert.
852  * Depending on the llog record type, a given handler is called that is
853  * responsible for preparing and sending the RPC to apply the change.
854  * Special record type LLOG_GEN_REC marking a reboot is cancelled right away.
855  *
856  * \param[in] env       LU environment provided by the caller
857  * \param[in] d         OSP device
858  * \param[in] llh       llog handle where the record is stored
859  * \param[in] rec       llog record
860  */
861 static void osp_sync_process_record(const struct lu_env *env,
862                                     struct osp_device *d,
863                                     struct llog_handle *llh,
864                                     struct llog_rec_hdr *rec)
865 {
866         struct llog_handle      *cathandle = llh->u.phd.phd_cat_handle;
867         struct llog_cookie       cookie;
868         int                      rc = 0;
869
870         ENTRY;
871
872         cookie.lgc_lgl = llh->lgh_id;
873         cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
874         cookie.lgc_index = rec->lrh_index;
875
876         if (unlikely(rec->lrh_type == LLOG_GEN_REC)) {
877                 struct llog_gen_rec *gen = (struct llog_gen_rec *)rec;
878
879                 /* we're waiting for the record generated by this instance */
880                 LASSERT(d->opd_syn_prev_done == 0);
881                 if (!memcmp(&d->opd_syn_generation, &gen->lgr_gen,
882                             sizeof(gen->lgr_gen))) {
883                         CDEBUG(D_HA, "processed all old entries\n");
884                         d->opd_syn_prev_done = 1;
885                 }
886
887                 /* cancel any generation record */
888                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
889
890                 RETURN_EXIT;
891         }
892
893         /*
894          * now we prepare and fill requests to OST, put them on the queue
895          * and fire after next commit callback
896          */
897
898         /* notice we increment counters before sending RPC, to be consistent
899          * in RPC interpret callback which may happen very quickly */
900         atomic_inc(&d->opd_syn_rpc_in_flight);
901         atomic_inc(&d->opd_syn_rpc_in_progress);
902
903         switch (rec->lrh_type) {
904         /* case MDS_UNLINK_REC is kept for compatibility */
905         case MDS_UNLINK_REC:
906                 rc = osp_sync_new_unlink_job(d, llh, rec);
907                 break;
908         case MDS_UNLINK64_REC:
909                 rc = osp_sync_new_unlink64_job(d, llh, rec);
910                 break;
911         case MDS_SETATTR64_REC:
912                 rc = osp_sync_new_setattr_job(d, llh, rec);
913                 break;
914         default:
915                 CERROR("%s: unknown record type: %x\n", d->opd_obd->obd_name,
916                        rec->lrh_type);
917                 /* treat "unknown record type" as "invalid" */
918                 rc = 1;
919                 break;
920         }
921
922         /* For all kinds of records, not matter successful or not,
923          * we should decrease changes and bump last_processed_id.
924          */
925         if (d->opd_syn_prev_done) {
926                 __u64 correct_id = osp_sync_correct_id(d, rec);
927                 LASSERT(atomic_read(&d->opd_syn_changes) > 0);
928                 LASSERT(correct_id <= d->opd_syn_last_committed_id);
929                 /* NOTE: it's possible to meet same id if
930                  * OST stores few stripes of same file
931                  */
932                 while (1) {
933                         /* another thread may be trying to set new value */
934                         rmb();
935                         if (correct_id > d->opd_syn_last_processed_id) {
936                                 d->opd_syn_last_processed_id = correct_id;
937                                 wake_up(&d->opd_syn_barrier_waitq);
938                         } else
939                                 break;
940                 }
941                 atomic_dec(&d->opd_syn_changes);
942         }
943         if (rc != 0) {
944                 atomic_dec(&d->opd_syn_rpc_in_flight);
945                 atomic_dec(&d->opd_syn_rpc_in_progress);
946         }
947
948         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
949                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
950                atomic_read(&d->opd_syn_rpc_in_progress));
951
952         /* Delete the invalid record */
953         if (rc == 1) {
954                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
955                 if (rc != 0)
956                         CERROR("%s: can't delete invalid record: "
957                                "fid = "DFID", rec_id = %u, rc = %d\n",
958                                d->opd_obd->obd_name,
959                                PFID(lu_object_fid(&cathandle->lgh_obj->do_lu)),
960                                rec->lrh_id, rc);
961         }
962
963         CDEBUG(D_HA, "found record %x, %d, idx %u, id %u\n",
964                rec->lrh_type, rec->lrh_len, rec->lrh_index, rec->lrh_id);
965
966         RETURN_EXIT;
967 }
968
969 /**
970  * Cancel llog records for the committed changes.
971  *
972  * The function walks through the list of the committed RPCs and cancels
973  * corresponding llog records. see osp_sync_request_commit_cb() for the
974  * details.
975  *
976  * \param[in] env       LU environment provided by the caller
977  * \param[in] d         OSP device
978  */
979 static void osp_sync_process_committed(const struct lu_env *env,
980                                        struct osp_device *d)
981 {
982         struct obd_device       *obd = d->opd_obd;
983         struct obd_import       *imp = obd->u.cli.cl_import;
984         struct ost_body         *body;
985         struct ptlrpc_request   *req;
986         struct llog_ctxt        *ctxt;
987         struct llog_handle      *llh;
988         struct list_head         list;
989         int                      rc, done = 0;
990
991         ENTRY;
992
993         if (list_empty(&d->opd_syn_committed_there))
994                 return;
995
996         /*
997          * if current status is -ENOSPC (lack of free space on OST)
998          * then we should poll OST immediately once object destroy
999          * is committed.
1000          * notice: we do this upon commit as well because some backends
1001          * (like DMU) do not release space right away.
1002          */
1003         if (d->opd_pre != NULL && unlikely(d->opd_pre_status == -ENOSPC))
1004                 osp_statfs_need_now(d);
1005
1006         /*
1007          * now cancel them all
1008          * XXX: can we improve this using some batching?
1009          *      with batch RPC that'll happen automatically?
1010          * XXX: can we store ctxt in lod_device and save few cycles ?
1011          */
1012         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1013         LASSERT(ctxt);
1014
1015         llh = ctxt->loc_handle;
1016         LASSERT(llh);
1017
1018         INIT_LIST_HEAD(&list);
1019         spin_lock(&d->opd_syn_lock);
1020         list_splice(&d->opd_syn_committed_there, &list);
1021         INIT_LIST_HEAD(&d->opd_syn_committed_there);
1022         spin_unlock(&d->opd_syn_lock);
1023
1024         while (!list_empty(&list)) {
1025                 struct osp_job_req_args *jra;
1026
1027                 jra = list_entry(list.next, struct osp_job_req_args,
1028                                  jra_committed_link);
1029                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
1030                 list_del_init(&jra->jra_committed_link);
1031
1032                 req = container_of((void *)jra, struct ptlrpc_request,
1033                                    rq_async_args);
1034                 body = req_capsule_client_get(&req->rq_pill,
1035                                               &RMF_OST_BODY);
1036                 LASSERT(body);
1037                 /* import can be closing, thus all commit cb's are
1038                  * called we can check committness directly */
1039                 if (req->rq_import_generation == imp->imp_generation) {
1040                         rc = llog_cat_cancel_records(env, llh, 1,
1041                                                      &jra->jra_lcookie);
1042                         if (rc)
1043                                 CERROR("%s: can't cancel record: %d\n",
1044                                        obd->obd_name, rc);
1045                 } else {
1046                         DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
1047                                   imp->imp_peer_committed_transno);
1048                 }
1049                 ptlrpc_req_finished(req);
1050                 done++;
1051         }
1052
1053         llog_ctxt_put(ctxt);
1054
1055         LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) >= done);
1056         atomic_sub(done, &d->opd_syn_rpc_in_progress);
1057         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
1058                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
1059                atomic_read(&d->opd_syn_rpc_in_progress));
1060
1061         osp_sync_check_for_work(d);
1062
1063         /* wake up the thread if requested to stop:
1064          * it might be waiting for in-progress to complete */
1065         if (unlikely(osp_sync_running(d) == 0))
1066                 wake_up(&d->opd_syn_waitq);
1067
1068         EXIT;
1069 }
1070
1071 /**
1072  * The core of the syncing mechanism.
1073  *
1074  * This is a callback called by the llog processing function. Essentially it
1075  * suspends llog processing until there is a record to process (it's supposed
1076  * to be committed locally). The function handles RPCs committed by the target
1077  * and cancels corresponding llog records.
1078  *
1079  * \param[in] env       LU environment provided by the caller
1080  * \param[in] llh       llog handle we're processing
1081  * \param[in] rec       current llog record
1082  * \param[in] data      callback data containing a pointer to the device
1083  *
1084  * \retval 0                    to ask the caller (llog_process()) to continue
1085  * \retval LLOG_PROC_BREAK      to ask the caller to break
1086  */
1087 static int osp_sync_process_queues(const struct lu_env *env,
1088                                    struct llog_handle *llh,
1089                                    struct llog_rec_hdr *rec,
1090                                    void *data)
1091 {
1092         struct osp_device       *d = data;
1093
1094         do {
1095                 struct l_wait_info lwi = { 0 };
1096
1097                 if (!osp_sync_running(d)) {
1098                         CDEBUG(D_HA, "stop llog processing\n");
1099                         return LLOG_PROC_BREAK;
1100                 }
1101
1102                 /* process requests committed by OST */
1103                 osp_sync_process_committed(env, d);
1104
1105                 /* if we there are changes to be processed and we have
1106                  * resources for this ... do now */
1107                 if (osp_sync_can_process_new(d, rec)) {
1108                         if (llh == NULL) {
1109                                 /* ask llog for another record */
1110                                 CDEBUG(D_HA, "%u changes, %u in progress,"
1111                                        " %u in flight\n",
1112                                        atomic_read(&d->opd_syn_changes),
1113                                        atomic_read(&d->opd_syn_rpc_in_progress),
1114                                        atomic_read(&d->opd_syn_rpc_in_flight));
1115                                 return 0;
1116                         }
1117                         osp_sync_process_record(env, d, llh, rec);
1118                         llh = NULL;
1119                         rec = NULL;
1120                 }
1121
1122                 if (d->opd_syn_last_processed_id == d->opd_syn_last_used_id)
1123                         osp_sync_remove_from_tracker(d);
1124
1125                 l_wait_event(d->opd_syn_waitq,
1126                              !osp_sync_running(d) ||
1127                              osp_sync_can_process_new(d, rec) ||
1128                              !list_empty(&d->opd_syn_committed_there),
1129                              &lwi);
1130         } while (1);
1131 }
1132
1133 /**
1134  * OSP sync thread.
1135  *
1136  * This thread runs llog_cat_process() scanner calling our callback
1137  * to process llog records. in the callback we implement tricky
1138  * state machine as we don't want to start scanning of the llog again
1139  * and again, also we don't want to process too many records and send
1140  * too many RPCs a time. so, depending on current load (num of changes
1141  * being synced to OST) the callback can suspend awaiting for some
1142  * new conditions, like syncs completed.
1143  *
1144  * In order to process llog records left by previous boots and to allow
1145  * llog_process_thread() to find something (otherwise it'd just exit
1146  * immediately) we add a special GENERATATION record on each boot.
1147  *
1148  * \param[in] _arg      a pointer to thread's arguments
1149  *
1150  * \retval 0            on success
1151  * \retval negative     negated errno on error
1152  */
1153 static int osp_sync_thread(void *_arg)
1154 {
1155         struct osp_device       *d = _arg;
1156         struct ptlrpc_thread    *thread = &d->opd_syn_thread;
1157         struct l_wait_info       lwi = { 0 };
1158         struct llog_ctxt        *ctxt;
1159         struct obd_device       *obd = d->opd_obd;
1160         struct llog_handle      *llh;
1161         struct lu_env            env;
1162         int                      rc, count;
1163
1164         ENTRY;
1165
1166         rc = lu_env_init(&env, LCT_LOCAL);
1167         if (rc) {
1168                 CERROR("%s: can't initialize env: rc = %d\n",
1169                        obd->obd_name, rc);
1170
1171                 spin_lock(&d->opd_syn_lock);
1172                 thread->t_flags = SVC_STOPPED;
1173                 spin_unlock(&d->opd_syn_lock);
1174                 wake_up(&thread->t_ctl_waitq);
1175
1176                 RETURN(rc);
1177         }
1178
1179         spin_lock(&d->opd_syn_lock);
1180         thread->t_flags = SVC_RUNNING;
1181         spin_unlock(&d->opd_syn_lock);
1182         wake_up(&thread->t_ctl_waitq);
1183
1184         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1185         if (ctxt == NULL) {
1186                 CERROR("can't get appropriate context\n");
1187                 GOTO(out, rc = -EINVAL);
1188         }
1189
1190         llh = ctxt->loc_handle;
1191         if (llh == NULL) {
1192                 CERROR("can't get llh\n");
1193                 llog_ctxt_put(ctxt);
1194                 GOTO(out, rc = -EINVAL);
1195         }
1196
1197         rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, 0, 0);
1198         if (rc < 0) {
1199                 CERROR("%s: llog process with osp_sync_process_queues "
1200                        "failed: %d\n", d->opd_obd->obd_name, rc);
1201                 GOTO(close, rc);
1202         }
1203         LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
1204                  "%u changes, %u in progress, %u in flight: %d\n",
1205                  atomic_read(&d->opd_syn_changes),
1206                  atomic_read(&d->opd_syn_rpc_in_progress),
1207                  atomic_read(&d->opd_syn_rpc_in_flight), rc);
1208
1209         /* we don't expect llog_process_thread() to exit till umount */
1210         LASSERTF(thread->t_flags != SVC_RUNNING,
1211                  "%u changes, %u in progress, %u in flight\n",
1212                  atomic_read(&d->opd_syn_changes),
1213                  atomic_read(&d->opd_syn_rpc_in_progress),
1214                  atomic_read(&d->opd_syn_rpc_in_flight));
1215
1216         /* wait till all the requests are completed */
1217         count = 0;
1218         while (atomic_read(&d->opd_syn_rpc_in_progress) > 0) {
1219                 osp_sync_process_committed(&env, d);
1220
1221                 lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
1222                 rc = l_wait_event(d->opd_syn_waitq,
1223                                   atomic_read(&d->opd_syn_rpc_in_progress) == 0,
1224                                   &lwi);
1225                 if (rc == -ETIMEDOUT)
1226                         count++;
1227                 LASSERTF(count < 10, "%s: %d %d %sempty\n",
1228                          d->opd_obd->obd_name,
1229                          atomic_read(&d->opd_syn_rpc_in_progress),
1230                          atomic_read(&d->opd_syn_rpc_in_flight),
1231                          list_empty(&d->opd_syn_committed_there) ? "" : "!");
1232
1233         }
1234
1235 close:
1236         llog_cat_close(&env, llh);
1237         rc = llog_cleanup(&env, ctxt);
1238         if (rc)
1239                 CERROR("can't cleanup llog: %d\n", rc);
1240 out:
1241         LASSERTF(atomic_read(&d->opd_syn_rpc_in_progress) == 0,
1242                  "%s: %d %d %sempty\n",
1243                  d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_progress),
1244                  atomic_read(&d->opd_syn_rpc_in_flight),
1245                  list_empty(&d->opd_syn_committed_there) ? "" : "!");
1246
1247         thread->t_flags = SVC_STOPPED;
1248
1249         wake_up(&thread->t_ctl_waitq);
1250
1251         lu_env_fini(&env);
1252
1253         RETURN(0);
1254 }
1255
1256 /**
1257  * Initialize llog.
1258  *
1259  * Initializes the llog. Specific llog to be used depends on the type of the
1260  * target OSP represents (OST or MDT). The function adds appends a new llog
1261  * record to mark the place where the records associated with this boot
1262  * start.
1263  *
1264  * \param[in] env       LU environment provided by the caller
1265  * \param[in] d         OSP device
1266  *
1267  * \retval 0            on success
1268  * \retval negative     negated errno on error
1269  */
1270 static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
1271 {
1272         struct osp_thread_info  *osi = osp_env_info(env);
1273         struct lu_fid           *fid = &osi->osi_fid;
1274         struct llog_handle      *lgh = NULL;
1275         struct obd_device       *obd = d->opd_obd;
1276         struct llog_ctxt        *ctxt;
1277         int                     rc;
1278
1279         ENTRY;
1280
1281         LASSERT(obd);
1282
1283         /*
1284          * open llog corresponding to our OST
1285          */
1286         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1287         obd->obd_lvfs_ctxt.dt = d->opd_storage;
1288
1289         lu_local_obj_fid(fid, LLOG_CATALOGS_OID);
1290
1291         rc = llog_osd_get_cat_list(env, d->opd_storage, d->opd_index, 1,
1292                                    &osi->osi_cid, fid);
1293         if (rc < 0) {
1294                 if (rc != -EFAULT) {
1295                         CERROR("%s: can't get id from catalogs: rc = %d\n",
1296                                obd->obd_name, rc);
1297                         RETURN(rc);
1298                 }
1299
1300                 /* After sparse OST indices is supported, the CATALOG file
1301                  * may become a sparse file that results in failure on
1302                  * reading. Skip this error as the llog will be created
1303                  * later */
1304                 memset(&osi->osi_cid, 0, sizeof(osi->osi_cid));
1305                 rc = 0;
1306         }
1307
1308         CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n",
1309                obd->obd_name, d->opd_index,
1310                POSTID(&osi->osi_cid.lci_logid.lgl_oi),
1311                osi->osi_cid.lci_logid.lgl_ogen);
1312
1313         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
1314                         d->opd_storage->dd_lu_dev.ld_obd,
1315                         &osp_mds_ost_orig_logops);
1316         if (rc)
1317                 RETURN(rc);
1318
1319         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1320         LASSERT(ctxt);
1321
1322         if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
1323                 rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid, NULL,
1324                                LLOG_OPEN_EXISTS);
1325                 /* re-create llog if it is missing */
1326                 if (rc == -ENOENT)
1327                         logid_set_id(&osi->osi_cid.lci_logid, 0);
1328                 else if (rc < 0)
1329                         GOTO(out_cleanup, rc);
1330         }
1331
1332         if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
1333                 rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
1334                 if (rc < 0)
1335                         GOTO(out_cleanup, rc);
1336                 osi->osi_cid.lci_logid = lgh->lgh_id;
1337         }
1338
1339         LASSERT(lgh != NULL);
1340         ctxt->loc_handle = lgh;
1341
1342         rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
1343         if (rc)
1344                 GOTO(out_close, rc);
1345
1346         rc = llog_osd_put_cat_list(env, d->opd_storage, d->opd_index, 1,
1347                                    &osi->osi_cid, fid);
1348         if (rc)
1349                 GOTO(out_close, rc);
1350
1351         /*
1352          * put a mark in the llog till which we'll be processing
1353          * old records restless
1354          */
1355         d->opd_syn_generation.mnt_cnt = cfs_time_current();
1356         d->opd_syn_generation.conn_cnt = cfs_time_current();
1357
1358         osi->osi_hdr.lrh_type = LLOG_GEN_REC;
1359         osi->osi_hdr.lrh_len = sizeof(osi->osi_gen);
1360
1361         memcpy(&osi->osi_gen.lgr_gen, &d->opd_syn_generation,
1362                sizeof(osi->osi_gen.lgr_gen));
1363
1364         rc = llog_cat_add(env, lgh, &osi->osi_gen.lgr_hdr, &osi->osi_cookie);
1365         if (rc < 0)
1366                 GOTO(out_close, rc);
1367         llog_ctxt_put(ctxt);
1368         RETURN(0);
1369 out_close:
1370         llog_cat_close(env, lgh);
1371 out_cleanup:
1372         llog_cleanup(env, ctxt);
1373         RETURN(rc);
1374 }
1375
1376 /**
1377  * Cleanup llog used for syncing.
1378  *
1379  * Closes and cleanups the llog. The function is called when the device is
1380  * shutting down.
1381  *
1382  * \param[in] env       LU environment provided by the caller
1383  * \param[in] d         OSP device
1384  */
1385 static void osp_sync_llog_fini(const struct lu_env *env, struct osp_device *d)
1386 {
1387         struct llog_ctxt *ctxt;
1388
1389         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
1390         if (ctxt) {
1391                 llog_cat_close(env, ctxt->loc_handle);
1392                 llog_cleanup(env, ctxt);
1393         }
1394 }
1395
1396 /**
1397  * Initialization of the sync component of OSP.
1398  *
1399  * Initializes the llog and starts a new thread to handle the changes to
1400  * the remote target (OST or MDT).
1401  *
1402  * \param[in] env       LU environment provided by the caller
1403  * \param[in] d         OSP device
1404  *
1405  * \retval 0            on success
1406  * \retval negative     negated errno on error
1407  */
1408 int osp_sync_init(const struct lu_env *env, struct osp_device *d)
1409 {
1410         struct l_wait_info       lwi = { 0 };
1411         struct task_struct      *task;
1412         int                      rc;
1413
1414         ENTRY;
1415
1416         d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
1417         d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
1418         spin_lock_init(&d->opd_syn_lock);
1419         init_waitqueue_head(&d->opd_syn_waitq);
1420         init_waitqueue_head(&d->opd_syn_barrier_waitq);
1421         thread_set_flags(&d->opd_syn_thread, SVC_INIT);
1422         init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
1423         INIT_LIST_HEAD(&d->opd_syn_inflight_list);
1424         INIT_LIST_HEAD(&d->opd_syn_committed_there);
1425
1426         if (d->opd_storage->dd_rdonly)
1427                 RETURN(0);
1428
1429         rc = osp_sync_id_traction_init(d);
1430         if (rc)
1431                 RETURN(rc);
1432
1433         /*
1434          * initialize llog storing changes
1435          */
1436         rc = osp_sync_llog_init(env, d);
1437         if (rc) {
1438                 CERROR("%s: can't initialize llog: rc = %d\n",
1439                        d->opd_obd->obd_name, rc);
1440                 GOTO(err_id, rc);
1441         }
1442
1443         /*
1444          * Start synchronization thread
1445          */
1446         task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
1447                            d->opd_index, d->opd_group);
1448         if (IS_ERR(task)) {
1449                 rc = PTR_ERR(task);
1450                 CERROR("%s: cannot start sync thread: rc = %d\n",
1451                        d->opd_obd->obd_name, rc);
1452                 GOTO(err_llog, rc);
1453         }
1454
1455         l_wait_event(d->opd_syn_thread.t_ctl_waitq,
1456                      osp_sync_running(d) || osp_sync_stopped(d), &lwi);
1457
1458         RETURN(0);
1459 err_llog:
1460         osp_sync_llog_fini(env, d);
1461 err_id:
1462         osp_sync_id_traction_fini(d);
1463         return rc;
1464 }
1465
1466 /**
1467  * Stop the syncing thread.
1468  *
1469  * Asks the syncing thread to stop and wait until it's stopped.
1470  *
1471  * \param[in] d         OSP device
1472  *
1473  * \retval              0
1474  */
1475 int osp_sync_fini(struct osp_device *d)
1476 {
1477         struct ptlrpc_thread *thread = &d->opd_syn_thread;
1478
1479         ENTRY;
1480
1481         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1482                 thread->t_flags = SVC_STOPPING;
1483                 wake_up(&d->opd_syn_waitq);
1484                 wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
1485         }
1486
1487         /*
1488          * unregister transaction callbacks only when sync thread
1489          * has finished operations with llog
1490          */
1491         osp_sync_id_traction_fini(d);
1492
1493         RETURN(0);
1494 }
1495
1496 static DEFINE_MUTEX(osp_id_tracker_sem);
1497 static struct list_head osp_id_tracker_list =
1498                 LIST_HEAD_INIT(osp_id_tracker_list);
1499
1500 /**
1501  * OSD commit callback.
1502  *
1503  * The function is used as a local OSD commit callback to track the highest
1504  * committed llog record id. see osp_sync_id_traction_init() for the details.
1505  *
1506  * \param[in] th        local transaction handle committed
1507  * \param[in] cookie    commit callback data (our private structure)
1508  */
1509 static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
1510 {
1511         struct osp_id_tracker   *tr = cookie;
1512         struct osp_device       *d;
1513         struct osp_txn_info     *txn;
1514
1515         LASSERT(tr);
1516
1517         txn = osp_txn_info(&th->th_ctx);
1518         if (txn == NULL || txn->oti_current_id < tr->otr_committed_id)
1519                 return;
1520
1521         spin_lock(&tr->otr_lock);
1522         if (likely(txn->oti_current_id > tr->otr_committed_id)) {
1523                 CDEBUG(D_OTHER, "committed: %llu -> %llu\n",
1524                        tr->otr_committed_id, txn->oti_current_id);
1525                 tr->otr_committed_id = txn->oti_current_id;
1526
1527                 list_for_each_entry(d, &tr->otr_wakeup_list,
1528                                     opd_syn_ontrack) {
1529                         d->opd_syn_last_committed_id = tr->otr_committed_id;
1530                         wake_up(&d->opd_syn_waitq);
1531                 }
1532         }
1533         spin_unlock(&tr->otr_lock);
1534 }
1535
1536 /**
1537  * Initialize commit tracking mechanism.
1538  *
1539  * Some setups may have thousands of OSTs and each will be represented by OSP.
1540  * Meaning order of magnitute many more changes to apply every second. In order
1541  * to keep the number of commit callbacks low this mechanism was introduced.
1542  * The mechanism is very similar to transno used by MDT service: it's an single
1543  * ID stream which can be assigned by any OSP to its llog records. The tricky
1544  * part is that ID is stored in per-transaction data and re-used by all the OSPs
1545  * involved in that transaction. Then all these OSPs are woken up utilizing a single OSD commit callback.
1546  *
1547  * The function initializes the data used by the tracker described above.
1548  * A singler tracker per OSD device is created.
1549  *
1550  * \param[in] d         OSP device
1551  *
1552  * \retval 0            on success
1553  * \retval negative     negated errno on error
1554  */
1555 static int osp_sync_id_traction_init(struct osp_device *d)
1556 {
1557         struct osp_id_tracker   *tr, *found = NULL;
1558         int                      rc = 0;
1559
1560         LASSERT(d);
1561         LASSERT(d->opd_storage);
1562         LASSERT(d->opd_syn_tracker == NULL);
1563         INIT_LIST_HEAD(&d->opd_syn_ontrack);
1564
1565         mutex_lock(&osp_id_tracker_sem);
1566         list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
1567                 if (tr->otr_dev == d->opd_storage) {
1568                         LASSERT(atomic_read(&tr->otr_refcount));
1569                         atomic_inc(&tr->otr_refcount);
1570                         d->opd_syn_tracker = tr;
1571                         found = tr;
1572                         break;
1573                 }
1574         }
1575
1576         if (found == NULL) {
1577                 rc = -ENOMEM;
1578                 OBD_ALLOC_PTR(tr);
1579                 if (tr) {
1580                         d->opd_syn_tracker = tr;
1581                         spin_lock_init(&tr->otr_lock);
1582                         tr->otr_dev = d->opd_storage;
1583                         tr->otr_next_id = 1;
1584                         tr->otr_committed_id = 0;
1585                         atomic_set(&tr->otr_refcount, 1);
1586                         INIT_LIST_HEAD(&tr->otr_wakeup_list);
1587                         list_add(&tr->otr_list, &osp_id_tracker_list);
1588                         tr->otr_tx_cb.dtc_txn_commit =
1589                                                 osp_sync_tracker_commit_cb;
1590                         tr->otr_tx_cb.dtc_cookie = tr;
1591                         tr->otr_tx_cb.dtc_tag = LCT_MD_THREAD;
1592                         dt_txn_callback_add(d->opd_storage, &tr->otr_tx_cb);
1593                         rc = 0;
1594                 }
1595         }
1596         mutex_unlock(&osp_id_tracker_sem);
1597
1598         return rc;
1599 }
1600
1601 /**
1602  * Release commit tracker.
1603  *
1604  * Decrease a refcounter on the tracker used by the given OSP device \a d.
1605  * If no more users left, then the tracker is released.
1606  *
1607  * \param[in] d         OSP device
1608  */
1609 static void osp_sync_id_traction_fini(struct osp_device *d)
1610 {
1611         struct osp_id_tracker *tr;
1612
1613         ENTRY;
1614
1615         LASSERT(d);
1616         tr = d->opd_syn_tracker;
1617         if (tr == NULL) {
1618                 EXIT;
1619                 return;
1620         }
1621
1622         osp_sync_remove_from_tracker(d);
1623
1624         mutex_lock(&osp_id_tracker_sem);
1625         if (atomic_dec_and_test(&tr->otr_refcount)) {
1626                 dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb);
1627                 LASSERT(list_empty(&tr->otr_wakeup_list));
1628                 list_del(&tr->otr_list);
1629                 OBD_FREE_PTR(tr);
1630                 d->opd_syn_tracker = NULL;
1631         }
1632         mutex_unlock(&osp_id_tracker_sem);
1633
1634         EXIT;
1635 }
1636
1637 /**
1638  * Generate a new ID on a tracker.
1639  *
1640  * Generates a new ID using the tracker associated with the given OSP device
1641  * \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to
1642  * the wakeup list, so OSP won't miss when a transaction using the ID is
1643  * committed.
1644  *
1645  * \param[in] d         OSP device
1646  * \param[in] id        0 or ID generated previously
1647  *
1648  * \retval              ID the caller should use
1649  */
1650 static __u64 osp_sync_id_get(struct osp_device *d, __u64 id)
1651 {
1652         struct osp_id_tracker *tr;
1653
1654         tr = d->opd_syn_tracker;
1655         LASSERT(tr);
1656
1657         /* XXX: we can improve this introducing per-cpu preallocated ids? */
1658         spin_lock(&tr->otr_lock);
1659         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW))
1660                 tr->otr_next_id = 0xfffffff0;
1661
1662         if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
1663                 spin_unlock(&tr->otr_lock);
1664                 CERROR("%s: next %llu, last synced %llu\n",
1665                        d->opd_obd->obd_name, tr->otr_next_id,
1666                        d->opd_syn_last_used_id);
1667                 LBUG();
1668         }
1669
1670         if (id == 0)
1671                 id = tr->otr_next_id++;
1672         if (id > d->opd_syn_last_used_id)
1673                 d->opd_syn_last_used_id = id;
1674         if (list_empty(&d->opd_syn_ontrack))
1675                 list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
1676         spin_unlock(&tr->otr_lock);
1677         CDEBUG(D_OTHER, "new id %llu\n", id);
1678
1679         return id;
1680 }
1681
1682 /**
1683  * Stop to propagate commit status to OSP.
1684  *
1685  * If the OSP does not have any llog records she's waiting to commit, then
1686  * it is possible to unsubscribe from wakeups from the tracking using this
1687  * method.
1688  *
1689  * \param[in] d         OSP device not willing to wakeup
1690  */
1691 static void osp_sync_remove_from_tracker(struct osp_device *d)
1692 {
1693         struct osp_id_tracker *tr;
1694
1695         tr = d->opd_syn_tracker;
1696         LASSERT(tr);
1697
1698         if (list_empty(&d->opd_syn_ontrack))
1699                 return;
1700
1701         spin_lock(&tr->otr_lock);
1702         list_del_init(&d->opd_syn_ontrack);
1703         spin_unlock(&tr->otr_lock);
1704 }
1705