Skip to content

dev #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 15, 2017
Merged

dev #32

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ binlog-format = row #Very important if you want to receive write, update and
## MySQL Test Tables

We have to separate test table into several ones because of this error, produced by MySQL:
```bash
```text
ERROR 1118 (42000): Row size too large. The maximum row size for the used table type, not counting BLOBs, is 65535. This includes storage overhead, check the manual. You have to change some columns to TEXT or BLOBs
```

Expand Down Expand Up @@ -727,6 +727,7 @@ CREATE TABLE IF NOT EXISTS `airline`.`ontime` (
### ClickHouse Table

```sql
CREATE DATABASE IF NOT EXISTS `airline`;
CREATE TABLE IF NOT EXISTS `airline`.`ontime` (
`Year` UInt16,
`Quarter` UInt8,
Expand Down
19 changes: 12 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,29 @@

import sys
import multiprocessing as mp

import logging
import pprint

if sys.version_info[0] < 3:
raise "Must be using Python 3"



class Main(Daemon):

config = None

def __init__(self):
mp.set_start_method('forkserver')
self.config = CLIOpts.config()
super().__init__(pidfile=self.config.pid_file())

print('---')
print(self.config)
print('---')
logging.basicConfig(
filename=self.config.log_file(),
level=self.config.log_level(),
format='%(asctime)s - %(levelname)s - %(message)s'
)
super().__init__(pidfile=self.config.pid_file())
logging.debug(pprint.pformat(self.config.config))
# mp.set_start_method('forkserver')

def run(self):
pumper = Pumper(
Expand All @@ -36,7 +41,7 @@ def run(self):
def start(self):
if self.config.is_daemon():
if not super().start():
print("Error going background. The process already running?")
logging.error("Error going background. The process already running?")
else:
self.run()

Expand Down
2 changes: 2 additions & 0 deletions run.sh → run_datatypes.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/bash

sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse"

python3 main.py \
--src-resume \
--src-wait \
Expand Down
23 changes: 23 additions & 0 deletions run_ontime.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash

sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse"

python3.6 main.py \
--src-resume \
--src-wait \
--nice-pause=1 \
--src-host=127.0.0.1 \
--src-user=root \
--dst-host=127.0.0.1 \
--csvpool \
--csvpool-file-path-prefix=qwe_ \
--mempool-max-flush-interval=60 \
--mempool-max-events-num=100000

# --mempool
# --mempool-max-events-num=3
# --mempool-max-flush-interval=30
# --dst-file=dst.csv
# --dst-schema=db
# --dst-table=datatypes
# --csvpool-keep-files
44 changes: 43 additions & 1 deletion src/cliopts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import argparse
from .config import Config
import logging


class CLIOpts(object):
Expand Down Expand Up @@ -41,6 +42,25 @@ def join(lists_to_join):
else:
return None

@staticmethod
def log_level_from_string(log_level_string):
level = log_level_string.upper()

if level == 'CRITICAL':
return logging.CRITICAL
elif level == 'ERROR':
return logging.ERROR
elif level == 'WARNING':
return logging.WARNING
elif level == 'INFO':
return logging.INFO
elif level == 'DEBUG':
return logging.DEBUG
elif level == 'NOTSET':
return logging.NOTSET
else:
return logging.NOTSET

@staticmethod
def config():
"""Parse application's CLI options into options dictionary
Expand All @@ -55,9 +75,27 @@ def config():
argparser.add_argument(
'--config-file',
type=str,
default='',
default=None,
help='Path to config file. Default - not specified'
)
argparser.add_argument(
'--log-file',
type=str,
default=None,
help='Path to log file. Default - not specified'
)
argparser.add_argument(
'--log-level',
type=str,
default="NOTSET",
help='Log Level. Default - NOTSET'
)
argparser.add_argument(
'--nice-pause',
type=int,
default=None,
help='make nice pause between attempts to read binlog stream'
)
argparser.add_argument(
'--dry',
action='store_true',
Expand Down Expand Up @@ -226,6 +264,8 @@ def config():
return Config ({
'app-config': {
'config-file': args.config_file,
'log-file': args.log_file,
'log-level': CLIOpts.log_level_from_string(args.log_level),
'dry': args.dry,
'daemon': args.daemon,
'pid_file': args.pid_file,
Expand Down Expand Up @@ -257,9 +297,11 @@ def config():
'only_tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None,
'blocking': args.src_wait,
'resume_stream': args.src_resume,
'nice_pause': 0 if args.nice_pause is None else args.nice_pause,
},
'file': {
'csv_file_path': args.src_file,
'nice_pause': 0 if args.nice_pause is None else args.nice_pause,
},
},

Expand Down
9 changes: 9 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def __str__(self):
def __getitem__(self, item):
return self.config[item]

def log_file(self):
return self.config['app-config']['log-file']

def log_level(self):
return self.config['app-config']['log-level']

def nice_pause(self):
return self.config['app-config']['nice-pause']

def pid_file(self):
return self.config['app-config']['pid_file']

Expand Down
3 changes: 2 additions & 1 deletion src/observable.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging

class Observable(object):

Expand All @@ -25,4 +26,4 @@ def notify(self, event_name, **attrs):
callback(**attrs)

def subscribers(self, event_name):
return event_name in self.event_handlers and (len(self.event_handlers[event_name]) > 0)
return event_name in self.event_handlers and (len(self.event_handlers[event_name]) > 0)
80 changes: 60 additions & 20 deletions src/pool/bbpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from .pool import Pool
from ..objectbuilder import ObjectBuilder
import logging


# Buckets Belts' Index Generator
Expand All @@ -20,17 +21,22 @@ class BBPool(Pool):

# buckets on the belts
belts = {
# pour data into 0-index bucket
# 'key.1': [[item,], [item, item, item,], [item, item, item,]]
# 'key.2': [[item,], [item, item, item,], [item, item, item,]]
# pour data into 0-index bucket
# 'key.1': [[item,], [item, item, item,], [item, item, item,]]
# 'key.2': [[item,], [item, item, item,], [item, item, item,]]
}

belts_rotated_at = {
# 'key.1': UNIX TIMESTAMP
# 'key.2': UNIX TIMESTAMP
# 'key.1': UNIX TIMESTAMP
# 'key.2': UNIX TIMESTAMP
}

buckets_count = 0
items_count = 0;

prev_time = None
prev_buckets_count = 0
prev_items_count = 0;

def __init__(
self,
Expand All @@ -49,11 +55,14 @@ def __init__(
)

def create_belt(self, belt_index):
# create belt with one empty bucket
"""create belt with one empty bucket"""

self.belts[belt_index] = [[]]
self.belts_rotated_at[belt_index] = int(time.time())

def insert(self, item):
"""Insert item into pool"""

# which belt we'll insert item?
belt_index = self.key_generator.generate(item)

Expand All @@ -64,12 +73,12 @@ def insert(self, item):
# append item to the 0-indexed bucket of the specified belt
self.belts[belt_index][0].append(item)

# may be bucket is already full
if len(self.belts[belt_index][0]) >= self.max_bucket_size:
# bucket full, rotate the belt
self.rotate_belt(belt_index)
# try to rotate belt - may it it already should be rotated
self.rotate_belt(belt_index)

def flush(self, key=None):
"""Flush all buckets from the belt and delete the belt itself"""

belt_index = key
empty_belts_indexes = []

Expand All @@ -87,25 +96,27 @@ def flush(self, key=None):
self.belts_rotated_at.pop(b_index)

def rotate_belt(self, belt_index, flush=False):
"""Try to rotate belt"""

now = int(time.time())
need_rotation = True if flush else False
rotate_reason = "FLUSH"

if len(self.belts[belt_index][0]) >= self.max_bucket_size:
if flush:
# explicit flush requested
rotate_reason = "FLUSH"

elif len(self.belts[belt_index][0]) >= self.max_bucket_size:
# 0-index bucket is full
need_rotation = True
rotate_reason = "SIZE"

elif now >= self.belts_rotated_at[belt_index] + self.max_interval_between_rotations:
# time interval reached
need_rotation = True
rotate_reason = "TIME"

if not need_rotation:
# belt not rotated
else:
# no need to rotate belt - belt not rotated
return False

# belts needs rotation
# belt(s) needs rotation

# insert empty bucket into the beginning of the belt
self.belts[belt_index].insert(0, [])
Expand All @@ -117,11 +128,23 @@ def rotate_belt(self, belt_index, flush=False):
while len(self.belts[belt_index]) > buckets_num_left_on_belt:
# too many buckets on the belt
# time to rotate belt and flush the most-right-bucket
self.buckets_count += 1

buckets_num = len(self.belts[belt_index])
last_bucket_size = len(self.belts[belt_index][buckets_num-1])
print('rotating belt. now:', now, 'bucket number:', self.buckets_count, 'index:', belt_index, 'reason:', rotate_reason, 'buckets on belt:', buckets_num, 'last bucket size:', last_bucket_size, 'belts count:', len(self.belts))

self.buckets_count += 1
self.items_count += last_bucket_size

logging.info('rot now:%d bktcnt:%d bktcontentcnt: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d',
now,
self.buckets_count,
self.items_count,
str(belt_index),
rotate_reason,
buckets_num,
last_bucket_size,
len(self.belts)
)

# time to flush data for specified key
self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)])
Expand All @@ -132,5 +155,22 @@ def rotate_belt(self, belt_index, flush=False):
writer.destroy()
del writer

if self.prev_time is not None:
# have previous time - meaning this is at least second rotate
# can calculate belt speed
window_size = now - self.prev_time
buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size
items_per_sec = (self.items_count - self.prev_items_count) / window_size
logging.info(
'buckets_per_sec:%f items_per_sec:%f for last %d sec',
buckets_per_sec,
items_per_sec,
window_size
)

self.prev_time = now
self.prev_buckets_count = self.buckets_count
self.prev_items_count = self.items_count

# belt rotated
return True
Loading