Merge branch 'master' of github.com:jesse-ai/jesse

This commit is contained in:
Saleh
2020-11-19 19:15:54 +03:30
8 changed files with 290 additions and 2 deletions

View File

@@ -38,6 +38,7 @@ from .frama import frama
from .gatorosc import gatorosc
from .gauss import gauss
from .hma import hma
from .high_pass import high_pass
from .ht_dcperiod import ht_dcperiod
from .ht_dcphase import ht_dcphase
from .ht_phasor import ht_phasor
@@ -75,7 +76,9 @@ from .pivot import pivot
from .ppo import ppo
from .pvi import pvi
from .qstick import qstick
from .reflex import reflex
from .roc import roc
from .roofing import roofing
from .rocp import rocp
from .rocr import rocr
from .rocr100 import rocr100
@@ -93,6 +96,7 @@ from .supertrend import supertrend
from .t3 import t3
from .tema import tema
from .trange import trange
from .trendflex import trendflex
from .trima import trima
from .trix import trix
from .tsf import tsf
@@ -106,6 +110,7 @@ from .vpt import vpt
from .vwma import vwma
from .vwmacd import vwmacd
from .vosc import vosc
from .voss import voss
from .wad import wad
from .wclprice import wclprice
from .wilders import wilders

View File

@@ -0,0 +1,37 @@
import math
from typing import Union
import numpy as np
from jesse.helpers import get_candle_source
def high_pass(candles: np.ndarray, period=48, source_type="close", sequential=False) -> Union[float, np.ndarray]:
"""
High Pass Filter indicator by John F. Ehlers
:param candles: np.ndarray
:param period: int - default=48
:param source_type: str - default: "close"
:param sequential: bool - default=False
:return: float | np.ndarray
"""
if not sequential and len(candles) > 240:
candles = candles[-240:]
source = get_candle_source(candles, source_type=source_type)
hpf = np.full_like(source, 0)
for i in range(source.shape[0]):
if not (i < 2):
alpha_arg = 2 * math.pi / (period * 1.414)
alpha1 = (math.cos(alpha_arg) + math.sin(alpha_arg) - 1) / math.cos(alpha_arg)
hpf[i] = math.pow(1.0 - alpha1 / 2.0, 2) * (source[i] - 2 * source[i - 1] + source[i - 2]) + 2 * (1 - alpha1) * hpf[i - 1] - math.pow(1 - alpha1, 2) * hpf[i - 2]
if sequential:
return hpf
else:
return None if np.isnan(hpf[-1]) else hpf[-1]

View File

@@ -0,0 +1,46 @@
import math
from typing import Union
import numpy as np
from .supersmoother import supersmoother
def reflex(candles: np.ndarray, period=20, source_type="close", sequential=False) -> Union[float, np.ndarray]:
"""
Reflex indicator by John F. Ehlers
:param candles: np.ndarray
:param period: int - default=20
:param source_type: str - default: "close"
:param sequential: bool - default=False
:return: float | np.ndarray
"""
if not sequential and len(candles) > 240:
candles = candles[-240:]
ssf = supersmoother(candles, cutoff=period / 2, source_type=source_type, sequential=True)
rf = np.full_like(ssf, 0)
ms = np.full_like(ssf, 0)
sums = np.full_like(ssf, 0)
for i in range(ssf.shape[0]):
if not (i < period):
slope = (ssf[i - period] - ssf[i]) / period
sum = 0
for t in range(1, period + 1):
sum = sum + (ssf[i] + t * slope) - ssf[i - t]
sum = sum / period
sums[i] = sum
ms[i] = 0.04 * sums[i] * sums[i] + 0.96 * ms[i - 1]
if ms[i] > 0:
rf[i] = sums[i] / math.sqrt(ms[i])
if sequential:
return rf
else:
return None if np.isnan(rf[-1]) else rf[-1]

View File

@@ -0,0 +1,32 @@
from typing import Union
import numpy as np
from .high_pass import high_pass
from .supersmoother import supersmoother
def roofing(candles: np.ndarray, hp_period=48, lp_period=10, source_type="close", sequential=False) -> Union[
float, np.ndarray]:
"""
Roofing Filter indicator by John F. Ehlers
:param candles: np.ndarray
:param period: int - default=20
:param source_type: str - default: "close"
:param sequential: bool - default=False
:return: float | np.ndarray
"""
if not sequential and len(candles) > 240:
candles = candles[-240:]
hpf = high_pass(candles, period=hp_period, source_type=source_type, sequential=True)
res = supersmoother(hpf, cutoff=lp_period, sequential=True)
if sequential:
return res
else:
return None if np.isnan(res[-1]) else res[-1]

View File

@@ -6,7 +6,7 @@ import numpy as np
from jesse.helpers import get_candle_source
def supersmoother(candles: np.ndarray, cutoff=14, source_type="close", sequential=False) -> Union[float, np.ndarray]:
def supersmoother(candles: np.ndarray, cutoff=14, source_type="close", sequential=False) -> Union[float, np.ndarray]:
"""
Super Smoother Filter 2pole Butterworth
This indicator was described by John F. Ehlers
@@ -22,7 +22,12 @@ def supersmoother(candles: np.ndarray, cutoff=14, source_type="close", sequentia
if not sequential and len(candles) > 240:
candles = candles[-240:]
source = get_candle_source(candles, source_type=source_type)
# Accept normal array too.
if len(candles.shape) == 1:
source = candles
else:
source = get_candle_source(candles, source_type=source_type)
source = source.flatten()
N = len(source)
source = source[~np.isnan(source)]

View File

@@ -0,0 +1,46 @@
import math
from typing import Union
import numpy as np
from .supersmoother import supersmoother
def trendflex(candles: np.ndarray, period=20, source_type="close", sequential=False) -> Union[float, np.ndarray]:
"""
Trendflex indicator by John F. Ehlers
:param candles: np.ndarray
:param period: int - default=20
:param source_type: str - default: "close"
:param sequential: bool - default=False
:return: float | np.ndarray
"""
if not sequential and len(candles) > 240:
candles = candles[-240:]
ssf = supersmoother(candles, cutoff=period / 2, source_type=source_type, sequential=True)
tf = np.full_like(ssf, 0)
ms = np.full_like(ssf, 0)
sums = np.full_like(ssf, 0)
for i in range(ssf.shape[0]):
if not (i < period):
sum = 0
for t in range(1, period + 1):
sum = sum + ssf[i] - ssf[i - t]
sum = sum / period
sums[i] = sum
ms[i] = 0.04 * sums[i] * sums[i] + 0.96 * ms[i - 1]
if ms[i] != 0:
tf[i] = sums[i] / math.sqrt(ms[i])
if sequential:
return tf
else:
return None if np.isnan(tf[-1]) else tf[-1]

54
jesse/indicators/voss.py Normal file
View File

@@ -0,0 +1,54 @@
import math
from collections import namedtuple
import numpy as np
from jesse.helpers import get_candle_source
VossFilter = namedtuple('VossFilter', ['voss', 'filt' ])
def voss(candles: np.ndarray, period=20, predict=3, bandwith=0.25, source_type="close", sequential=False) -> VossFilter:
"""
Voss indicator by John F. Ehlers
:param candles: np.ndarray
:param period: int - default=20
:param predict: int - default=3
:param bandwith: float - default=0.25
:param source_type: str - default: "close"
:param sequential: bool - default=False
:return: float | np.ndarray
"""
if not sequential and len(candles) > 240:
candles = candles[-240:]
source = get_candle_source(candles, source_type=source_type)
voss = np.full_like(source, 0)
filt = np.full_like(source, 0)
pi = math.pi
order = 3 * predict
f1 = math.cos(2 * pi / period)
g1 = math.cos(bandwith * 2 * pi / period)
s1 = 1 / g1 - math.sqrt(1 / (g1 * g1) - 1)
for i in range(source.shape[0]):
if not (i <= period or i <= 5 or i <= order):
filt[i] = 0.5 * (1 - s1) * (source[i] - source[i - 2]) + f1 * (1 + s1) * filt[i - 1] - s1 * filt[i - 2]
for i in range(source.shape[0]):
if not (i <= period or i <= 5 or i <= order):
sumc = 0
for count in range(order):
sumc = sumc + ((count + 1) / float(order)) * voss[i - (order - count)]
voss[i] = ((3 + order) / 2) * filt[i] - sumc
if sequential:
return VossFilter(voss, filt)
else:
return VossFilter(voss[-1], filt[-1])

View File

@@ -502,6 +502,15 @@ def test_hma():
assert seq[-1] == single
def test_high_pass():
candles = np.array(mama_candles)
single = ta.high_pass(candles)
seq = ta.high_pass(candles, sequential=True)
assert round(single, 0) == -101
assert len(seq) == len(candles)
assert seq[-1] == single
def test_ht_dcperiod():
candles = np.array(mama_candles)
single = ta.ht_dcperiod(candles)
@@ -1073,6 +1082,18 @@ def test_qstick():
assert seq[-1] == single
def test_reflex():
# use the same candles as mama_candles
candles = np.array(mama_candles)
single = ta.reflex(candles)
seq = ta.reflex(candles, sequential=True)
assert round(single, 2) == -0.55
assert len(seq) == len(candles)
assert seq[-1] == single
def test_roc():
# use the same candles as mama_candles
candles = np.array(mama_candles)
@@ -1085,6 +1106,18 @@ def test_roc():
assert seq[-1] == single
def test_roofing():
# use the same candles as mama_candles
candles = np.array(mama_candles)
single = ta.roofing(candles)
seq = ta.roofing(candles, sequential=True)
assert round(single, 0) == -36
assert len(seq) == len(candles)
assert seq[-1] == single
def test_rocp():
# use the same candles as mama_candles
candles = np.array(mama_candles)
@@ -1303,6 +1336,17 @@ def test_trange():
assert seq[-1] == single
def test_trendflex():
candles = np.array(mama_candles)
single = ta.trendflex(candles)
seq = ta.trendflex(candles, sequential=True)
assert round(single, 2) == -1.48
assert len(seq) == len(candles)
assert seq[-1] == single
def test_trima():
# use the same candles as mama_candles
candles = np.array(mama_candles)
@@ -1420,6 +1464,24 @@ def test_vosc():
assert len(seq) == len(candles)
assert seq[-1] == single
def test_voss():
# use the same candles as mama_candles
candles = np.array(mama_candles)
single = ta.voss(candles)
seq = ta.voss(candles, sequential=True)
assert type(single).__name__ == 'VossFilter'
assert round(single.voss, 2) == -30.71
assert round(single.filt, 2) == -5.98
assert seq.voss[-1] == single.voss
assert seq.filt[-1] == single.filt
assert len(seq.voss) == len(candles)
assert len(seq.filt) == len(candles)
def test_vpt():
candles = np.array(mama_candles)
single = ta.vpt(candles)
@@ -1429,6 +1491,7 @@ def test_vpt():
assert len(seq) == len(candles)
assert seq[-1] == single
def test_vwma():
candles = np.array(vwma_candles)
single = ta.vwma(candles)