Implements port forwarding for odo dev (#5526)

* First pass of port-forward for odo dev

* Supports forwarding multiple ports/endpoints

* Change message wording

* Port forwarding happens port 40001 onward

* Organize the code

* Integration tests

* Update the CLI help message

* Catch and check err

* Changes for failing unit tests

* Fix test and add newline to a message

* Remote label and use for loop

* Adds TODO to cleanup when port-forwarding fails

* Change messaging printed on the CLI

* Break Start method into multiple methods

* Set debug port while creating push parameters in dev

* Test after making a change to a file in pwd

* Refactoring to put things where they should be

* Notify user to run odo dev again if endpoints change

* Unit tests and rebase mistakes

* Add newline character for clear formatting

* Create business logic for reuse to find endpoints

* Fix unit and integration test failures

* Self review

* Changes based on Philippe's review

* Two more changes based on Philippe's review

* More changes based on PR review

* Changes based on PR feedback and more

- Renames all receivers in kclient package to `c`
- Adds mocks

* Removes unnecessary function

* Make function more generic for reuse

* Rename Options to DevOptions

* Address review comments by Tomas

* Fix failing unit test

* Modify message to restart odo dev

* Use helper.RunDevMode in odo dev tests

* Remove doubly imported log package

* Modifies clientset at CLI and business layer

- No need of KUBERNETES client for `odo dev` business layer.
- Added KUBERNETES client for `odo dev` CLI layer.

* Remove hardcoding of URL in tests

- It uses @feloy's work in PR #5570.

* Uses regexp to find strings.

* Makes FindAllMatchingStrings more generally usable

* Adds constant for localhost regex

* Replace matches with urls and remove unnecessary Sprintf

* Documents what urls is in RunDevMode function.

* Update tests/helper/helper_dev.go

Co-authored-by: Philippe Martin <contact@elol.fr>

* Update tests/helper/helper_dev.go

Co-authored-by: Philippe Martin <contact@elol.fr>

* Update tests/helper/helper_dev.go

Co-authored-by: Philippe Martin <contact@elol.fr>

* Update tests/helper/helper_dev.go

Co-authored-by: Philippe Martin <contact@elol.fr>

Co-authored-by: Philippe Martin <contact@elol.fr>
This commit is contained in:
Dharmit Shah
2022-03-25 19:44:13 +05:30
committed by GitHub
parent 80d7cfde30
commit 0396b435c3
26 changed files with 1442 additions and 148 deletions

View File

@@ -3,12 +3,12 @@ package dev
import ( import (
"io" "io"
"github.com/redhat-developer/odo/pkg/envinfo"
"github.com/devfile/library/pkg/devfile/parser" "github.com/devfile/library/pkg/devfile/parser"
"github.com/redhat-developer/odo/pkg/devfile/adapters" "github.com/redhat-developer/odo/pkg/devfile/adapters"
"github.com/redhat-developer/odo/pkg/devfile/adapters/common" "github.com/redhat-developer/odo/pkg/devfile/adapters/common"
"github.com/redhat-developer/odo/pkg/devfile/adapters/kubernetes" "github.com/redhat-developer/odo/pkg/devfile/adapters/kubernetes"
"github.com/redhat-developer/odo/pkg/envinfo"
"github.com/redhat-developer/odo/pkg/log"
"github.com/redhat-developer/odo/pkg/watch" "github.com/redhat-developer/odo/pkg/watch"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@@ -26,25 +26,21 @@ func NewDevClient(watchClient watch.Client) *DevClient {
} }
} }
// Start the resources in devfileObj on the platformContext. It then pushes the files in path to the container, func (o *DevClient) Start(devfileObj parser.DevfileObj, platformContext kubernetes.KubernetesContext, path string) error {
// and watches it for any changes. It prints all the logs/output to out.
func (o *DevClient) Start(devfileObj parser.DevfileObj, platformContext kubernetes.KubernetesContext, ignorePaths []string, path string, out io.Writer, h Handler) error {
var err error
var adapter common.ComponentAdapter
klog.V(4).Infoln("Creating new adapter") klog.V(4).Infoln("Creating new adapter")
adapter, err = adapters.NewComponentAdapter(devfileObj.GetMetadataName(), path, "app", devfileObj, platformContext) adapter, err := adapters.NewComponentAdapter(devfileObj.GetMetadataName(), path, "app", devfileObj, platformContext)
if err != nil { if err != nil {
return err return err
} }
var envSpecificInfo *envinfo.EnvSpecificInfo envSpecificInfo, err := envinfo.NewEnvSpecificInfo(path)
envSpecificInfo, err = envinfo.NewEnvSpecificInfo(path)
if err != nil { if err != nil {
return err return err
} }
pushParameters := common.PushParameters{ pushParameters := common.PushParameters{
EnvSpecificInfo: *envSpecificInfo, EnvSpecificInfo: *envSpecificInfo,
DebugPort: envSpecificInfo.GetDebugPort(),
Path: path, Path: path,
} }
@@ -53,8 +49,20 @@ func (o *DevClient) Start(devfileObj parser.DevfileObj, platformContext kubernet
if err != nil { if err != nil {
return err return err
} }
klog.V(4).Infoln("Successfully created inner-loop resourcs") klog.V(4).Infoln("Successfully created inner-loop resources")
log.Finfof(out, "\nYour application is now running on your cluster.") return nil
}
func (o *DevClient) Cleanup() error {
var err error
return err
}
func (o *DevClient) Watch(devfileObj parser.DevfileObj, path string, ignorePaths []string, out io.Writer, h Handler) error {
envSpecificInfo, err := envinfo.NewEnvSpecificInfo(path)
if err != nil {
return err
}
watchParameters := watch.WatchParameters{ watchParameters := watch.WatchParameters{
Path: path, Path: path,
@@ -64,13 +72,8 @@ func (o *DevClient) Start(devfileObj parser.DevfileObj, platformContext kubernet
DevfileWatchHandler: h.RegenerateAdapterAndPush, DevfileWatchHandler: h.RegenerateAdapterAndPush,
EnvSpecificInfo: envSpecificInfo, EnvSpecificInfo: envSpecificInfo,
FileIgnores: ignorePaths, FileIgnores: ignorePaths,
InitialDevfileObj: devfileObj,
} }
return o.watchClient.WatchAndPush(out, watchParameters) return o.watchClient.WatchAndPush(out, watchParameters)
} }
// Cleanup cleans the resources created by Push
func (o *DevClient) Cleanup() error {
var err error
return err
}

View File

@@ -12,7 +12,15 @@ import (
) )
type Client interface { type Client interface {
Start(d parser.DevfileObj, platformContext kubernetes.KubernetesContext, ignorePaths []string, path string, w io.Writer, h Handler) error // Start the resources in devfileObj on the platformContext. It then pushes the files in path to the container.
Start(devfileObj parser.DevfileObj, platformContext kubernetes.KubernetesContext, path string) error
// Watch watches for any changes to the files under path while ignoring the files/directories in ignorePaths.
// It logs messages to out and uses the Handler h to perform push operation when anything changes in path.
// It uses devfileObj to notify user to restart odo dev if they change endpoint information in the devfile.
Watch(devfileObj parser.DevfileObj, path string, ignorePaths []string, out io.Writer, h Handler) error
// Cleanup cleans the resources created by Start
Cleanup() error Cleanup() error
} }

118
pkg/dev/mock.go Normal file
View File

@@ -0,0 +1,118 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: pkg/dev/interface.go
// Package dev is a generated GoMock package.
package dev
import (
io "io"
reflect "reflect"
parser "github.com/devfile/library/pkg/devfile/parser"
gomock "github.com/golang/mock/gomock"
common "github.com/redhat-developer/odo/pkg/devfile/adapters/common"
kubernetes "github.com/redhat-developer/odo/pkg/devfile/adapters/kubernetes"
watch "github.com/redhat-developer/odo/pkg/watch"
)
// MockClient is a mock of Client interface.
type MockClient struct {
ctrl *gomock.Controller
recorder *MockClientMockRecorder
}
// MockClientMockRecorder is the mock recorder for MockClient.
type MockClientMockRecorder struct {
mock *MockClient
}
// NewMockClient creates a new mock instance.
func NewMockClient(ctrl *gomock.Controller) *MockClient {
mock := &MockClient{ctrl: ctrl}
mock.recorder = &MockClientMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockClient) EXPECT() *MockClientMockRecorder {
return m.recorder
}
// Cleanup mocks base method.
func (m *MockClient) Cleanup() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Cleanup")
ret0, _ := ret[0].(error)
return ret0
}
// Cleanup indicates an expected call of Cleanup.
func (mr *MockClientMockRecorder) Cleanup() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cleanup", reflect.TypeOf((*MockClient)(nil).Cleanup))
}
// Start mocks base method.
func (m *MockClient) Start(devfileObj parser.DevfileObj, platformContext kubernetes.KubernetesContext, path string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Start", devfileObj, platformContext, path)
ret0, _ := ret[0].(error)
return ret0
}
// Start indicates an expected call of Start.
func (mr *MockClientMockRecorder) Start(devfileObj, platformContext, path interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockClient)(nil).Start), devfileObj, platformContext, path)
}
// Watch mocks base method.
func (m *MockClient) Watch(devfileObj parser.DevfileObj, path string, ignorePaths []string, out io.Writer, h Handler) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Watch", devfileObj, path, ignorePaths, out, h)
ret0, _ := ret[0].(error)
return ret0
}
// Watch indicates an expected call of Watch.
func (mr *MockClientMockRecorder) Watch(devfileObj, path, ignorePaths, out, h interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockClient)(nil).Watch), devfileObj, path, ignorePaths, out, h)
}
// MockHandler is a mock of Handler interface.
type MockHandler struct {
ctrl *gomock.Controller
recorder *MockHandlerMockRecorder
}
// MockHandlerMockRecorder is the mock recorder for MockHandler.
type MockHandlerMockRecorder struct {
mock *MockHandler
}
// NewMockHandler creates a new mock instance.
func NewMockHandler(ctrl *gomock.Controller) *MockHandler {
mock := &MockHandler{ctrl: ctrl}
mock.recorder = &MockHandlerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockHandler) EXPECT() *MockHandlerMockRecorder {
return m.recorder
}
// RegenerateAdapterAndPush mocks base method.
func (m *MockHandler) RegenerateAdapterAndPush(arg0 common.PushParameters, arg1 watch.WatchParameters) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RegenerateAdapterAndPush", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// RegenerateAdapterAndPush indicates an expected call of RegenerateAdapterAndPush.
func (mr *MockHandlerMockRecorder) RegenerateAdapterAndPush(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegenerateAdapterAndPush", reflect.TypeOf((*MockHandler)(nil).RegenerateAdapterAndPush), arg0, arg1)
}

View File

@@ -107,6 +107,12 @@ type ClientInterface interface {
GetOnePodFromSelector(selector string) (*corev1.Pod, error) GetOnePodFromSelector(selector string) (*corev1.Pod, error)
GetPodLogs(podName, containerName string, followLog bool) (io.ReadCloser, error) GetPodLogs(podName, containerName string, followLog bool) (io.ReadCloser, error)
// port_forwarding.go
// SetupPortForwarding creates port-forwarding for the pod on the port pairs provided in the
// ["<localhost-port>":"<remote-pod-port>"] format. errOut is used by the client-go library to output any errors
// encountered while the port-forwarding is running
SetupPortForwarding(pod *corev1.Pod, portPairs []string, errOut io.Writer) error
// projects.go // projects.go
CreateNewProject(projectName string, wait bool) error CreateNewProject(projectName string, wait bool) error
DeleteProject(name string, wait bool) error DeleteProject(name string, wait bool) error

View File

@@ -1278,6 +1278,20 @@ func (mr *MockClientInterfaceMockRecorder) SetNamespace(ns interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetNamespace", reflect.TypeOf((*MockClientInterface)(nil).SetNamespace), ns) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetNamespace", reflect.TypeOf((*MockClientInterface)(nil).SetNamespace), ns)
} }
// SetupPortForwarding mocks base method.
func (m *MockClientInterface) SetupPortForwarding(pod *v12.Pod, portPairs []string, errOut io.Writer) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetupPortForwarding", pod, portPairs, errOut)
ret0, _ := ret[0].(error)
return ret0
}
// SetupPortForwarding indicates an expected call of SetupPortForwarding.
func (mr *MockClientInterfaceMockRecorder) SetupPortForwarding(pod, portPairs, errOut interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetupPortForwarding", reflect.TypeOf((*MockClientInterface)(nil).SetupPortForwarding), pod, portPairs, errOut)
}
// UnlinkSecret mocks base method. // UnlinkSecret mocks base method.
func (m *MockClientInterface) UnlinkSecret(secretName, componentName, applicationName string) error { func (m *MockClientInterface) UnlinkSecret(secretName, componentName, applicationName string) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@@ -179,10 +179,10 @@ func addParam(schema *spec.Schema, param olm.SpecDescriptor) {
} }
// GetRestMappingFromUnstructured returns rest mappings from unstructured data // GetRestMappingFromUnstructured returns rest mappings from unstructured data
func (client *Client) GetRestMappingFromUnstructured(u unstructured.Unstructured) (*meta.RESTMapping, error) { func (c *Client) GetRestMappingFromUnstructured(u unstructured.Unstructured) (*meta.RESTMapping, error) {
gvk := u.GroupVersionKind() gvk := u.GroupVersionKind()
cfg := client.GetClientConfig() cfg := c.GetClientConfig()
dc, err := discovery.NewDiscoveryClientForConfig(cfg) dc, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil { if err != nil {
@@ -194,11 +194,11 @@ func (client *Client) GetRestMappingFromUnstructured(u unstructured.Unstructured
} }
// GetOperatorGVRList creates a slice of rest mappings that are provided by Operators (CSV) // GetOperatorGVRList creates a slice of rest mappings that are provided by Operators (CSV)
func (client *Client) GetOperatorGVRList() ([]meta.RESTMapping, error) { func (c *Client) GetOperatorGVRList() ([]meta.RESTMapping, error) {
var operatorGVRList []meta.RESTMapping var operatorGVRList []meta.RESTMapping
// ignoring the error because // ignoring the error because
csvs, err := client.ListClusterServiceVersions() csvs, err := c.ListClusterServiceVersions()
if err != nil { if err != nil {
return operatorGVRList, err return operatorGVRList, err
} }

View File

@@ -0,0 +1,38 @@
package kclient
import (
"io"
"net/http"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
func (c *Client) SetupPortForwarding(pod *corev1.Pod, portPairs []string, errOut io.Writer) error {
transport, upgrader, err := spdy.RoundTripperFor(c.GetClientConfig())
if err != nil {
return err
}
req := c.GeneratePortForwardReq(pod.Name)
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())
stopChan := make(chan struct{}, 1)
// passing nil for readyChan because it's eventually being closed if it's not nil
// passing nil for out because we only care for error, not for output messages; we want to print our own messages
fw, err := portforward.NewOnAddresses(dialer, []string{"localhost"}, portPairs, stopChan, nil, nil, errOut)
if err != nil {
return err
}
// start port-forwarding
err = fw.ForwardPorts()
if err != nil {
// do cleanup when this happens
// TODO: #5485
return err
}
return nil
}

View File

@@ -73,3 +73,13 @@ func NewComponentsWithSameNameError() ComponentsWithSameNameError {
func (e ComponentsWithSameNameError) Error() string { func (e ComponentsWithSameNameError) Error() string {
return "more than one component with the same name, should not happen" return "more than one component with the same name, should not happen"
} }
type NotAContainerError struct{}
func NewNotAContainerError() NotAContainerError {
return NotAContainerError{}
}
func (e NotAContainerError) Error() string {
return "component not a container"
}

View File

@@ -119,3 +119,55 @@ func execDevfileEvent(devfileObj parser.DevfileObj, events []string, handler Han
} }
return nil return nil
} }
// GetContainerEndpointMapping returns a map of container names and slice of its endpoints (in int) with exposure status other than none
func GetContainerEndpointMapping(containers []v1alpha2.Component) map[string][]int {
ceMapping := make(map[string][]int)
for _, container := range containers {
if container.ComponentUnion.Container == nil {
// this is not a container component; continue prevents panic when accessing Endpoints field
continue
}
k := container.Name
if _, ok := ceMapping[k]; !ok {
ceMapping[k] = []int{}
}
endpoints := container.Container.Endpoints
for _, e := range endpoints {
if e.Exposure != v1alpha2.NoneEndpointExposure {
ceMapping[k] = append(ceMapping[k], e.TargetPort)
}
}
}
return ceMapping
}
// GetEndpointsFromDevfile returns a slice of all endpoints in a devfile and ignores the endpoints with exposure values in ignoreExposures
func GetEndpointsFromDevfile(devfileObj parser.DevfileObj, ignoreExposures []v1alpha2.EndpointExposure) ([]v1alpha2.Endpoint, error) {
containers, err := devfileObj.Data.GetComponents(common.DevfileOptions{
ComponentOptions: common.ComponentOptions{ComponentType: v1alpha2.ContainerComponentType},
})
if err != nil {
return nil, err
}
var allEndpoints []v1alpha2.Endpoint
for _, c := range containers {
allEndpoints = append(allEndpoints, c.Container.Endpoints...)
}
var endpoints []v1alpha2.Endpoint
for _, e := range allEndpoints {
ignore := false
for _, i := range ignoreExposures {
if e.Exposure == i {
ignore = true
}
}
if !ignore {
endpoints = append(endpoints, e)
}
}
return endpoints, nil
}

View File

@@ -248,3 +248,191 @@ func TestDeploy(t *testing.T) {
}) })
} }
} }
func TestGetContainerEndpointMapping(t *testing.T) {
type args struct {
containers []v1alpha2.Component
}
imageComponent := generator.GetImageComponent(generator.ImageComponentParams{
Name: "image-component",
Image: v1alpha2.Image{
ImageName: "an-image-name",
},
})
containerWithNoEndpoints := generator.GetContainerComponent(generator.ContainerComponentParams{
Name: "container 1",
Endpoints: nil,
})
containerWithOnePublicEndpoint := generator.GetContainerComponent(generator.ContainerComponentParams{
Name: "container 2",
Endpoints: []v1alpha2.Endpoint{
{
Name: "ep1",
TargetPort: 8080,
Exposure: v1alpha2.PublicEndpointExposure,
},
},
})
containerWithOneInternalEndpoint := generator.GetContainerComponent(generator.ContainerComponentParams{
Name: "container 3",
Endpoints: []v1alpha2.Endpoint{
{
Name: "ep2",
TargetPort: 9090,
Exposure: v1alpha2.InternalEndpointExposure,
},
},
})
tests := []struct {
name string
args args
want map[string][]int
}{
{
name: "invalid input - image components instead of container components",
args: args{
containers: []v1alpha2.Component{imageComponent},
},
want: map[string][]int{},
},
{
name: "one container with no endpoints exposed",
args: args{
containers: []v1alpha2.Component{containerWithNoEndpoints},
},
want: map[string][]int{containerWithNoEndpoints.Name: {}},
},
{
name: "multiple containers with varying types of endpoints",
args: args{
containers: []v1alpha2.Component{containerWithNoEndpoints, containerWithOnePublicEndpoint, containerWithOneInternalEndpoint},
},
want: map[string][]int{containerWithNoEndpoints.Name: {}, containerWithOnePublicEndpoint.Name: {8080}, containerWithOneInternalEndpoint.Name: {9090}},
},
{
name: "invalid input - one image component with rest being containers",
args: args{
containers: []v1alpha2.Component{containerWithNoEndpoints, containerWithOnePublicEndpoint, containerWithOneInternalEndpoint, imageComponent},
},
want: map[string][]int{containerWithNoEndpoints.Name: {}, containerWithOnePublicEndpoint.Name: {8080}, containerWithOneInternalEndpoint.Name: {9090}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := GetContainerEndpointMapping(tt.args.containers)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetContainerEndpointMapping() got = %v, want %v", got, tt.want)
}
})
}
}
func TestGetEndpointsFromDevfile(t *testing.T) {
type args struct {
devfileObj func() parser.DevfileObj
ignoreExposures []v1alpha2.EndpointExposure
}
ep1 := v1alpha2.Endpoint{Name: "ep1", TargetPort: 8080, Exposure: v1alpha2.NoneEndpointExposure}
ep2 := v1alpha2.Endpoint{Name: "ep2", TargetPort: 9090, Exposure: v1alpha2.InternalEndpointExposure}
ep3 := v1alpha2.Endpoint{Name: "ep3", TargetPort: 8888, Exposure: v1alpha2.PublicEndpointExposure}
container := generator.GetContainerComponent(generator.ContainerComponentParams{
Name: "container-1",
Endpoints: []v1alpha2.Endpoint{ep1, ep2, ep3},
})
tests := []struct {
name string
args args
want []v1alpha2.Endpoint
wantErr bool
}{
{
name: "Ignore exposure of type none",
args: args{
devfileObj: func() parser.DevfileObj {
data, _ := data.NewDevfileData(string(data.APISchemaVersion200))
_ = data.AddComponents([]v1alpha2.Component{container})
return parser.DevfileObj{
Data: data,
}
},
ignoreExposures: []v1alpha2.EndpointExposure{v1alpha2.NoneEndpointExposure},
},
want: []v1alpha2.Endpoint{ep2, ep3},
},
{
name: "Ignore exposure of type public",
args: args{
devfileObj: func() parser.DevfileObj {
data, _ := data.NewDevfileData(string(data.APISchemaVersion200))
_ = data.AddComponents([]v1alpha2.Component{container})
return parser.DevfileObj{
Data: data,
}
},
ignoreExposures: []v1alpha2.EndpointExposure{v1alpha2.PublicEndpointExposure},
},
want: []v1alpha2.Endpoint{ep1, ep2},
},
{
name: "Ignore exposure of type internal",
args: args{
devfileObj: func() parser.DevfileObj {
data, _ := data.NewDevfileData(string(data.APISchemaVersion200))
_ = data.AddComponents([]v1alpha2.Component{container})
return parser.DevfileObj{
Data: data,
}
},
ignoreExposures: []v1alpha2.EndpointExposure{v1alpha2.InternalEndpointExposure},
},
want: []v1alpha2.Endpoint{ep1, ep3},
},
{
name: "Ignore none",
args: args{
devfileObj: func() parser.DevfileObj {
data, _ := data.NewDevfileData(string(data.APISchemaVersion200))
_ = data.AddComponents([]v1alpha2.Component{container})
return parser.DevfileObj{
Data: data,
}
},
ignoreExposures: []v1alpha2.EndpointExposure{},
},
want: []v1alpha2.Endpoint{ep1, ep2, ep3},
},
{
name: "Ignore all exposure types",
args: args{
devfileObj: func() parser.DevfileObj {
data, _ := data.NewDevfileData(string(data.APISchemaVersion200))
_ = data.AddComponents([]v1alpha2.Component{container})
return parser.DevfileObj{
Data: data,
}
},
ignoreExposures: []v1alpha2.EndpointExposure{v1alpha2.InternalEndpointExposure, v1alpha2.NoneEndpointExposure, v1alpha2.PublicEndpointExposure},
},
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetEndpointsFromDevfile(tt.args.devfileObj(), tt.args.ignoreExposures)
if (err != nil) != tt.wantErr {
t.Errorf("GetEndpointsFromDevfile() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetEndpointsFromDevfile() got = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -6,12 +6,21 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"reflect"
"strings"
"github.com/redhat-developer/odo/pkg/log"
"github.com/devfile/api/v2/pkg/apis/workspaces/v1alpha2"
parsercommon "github.com/devfile/library/pkg/devfile/parser/data/v2/common"
"github.com/devfile/library/pkg/devfile/parser" "github.com/devfile/library/pkg/devfile/parser"
"github.com/redhat-developer/odo/pkg/libdevfile"
"github.com/redhat-developer/odo/pkg/util"
"github.com/redhat-developer/odo/pkg/devfile/adapters" "github.com/redhat-developer/odo/pkg/devfile/adapters"
"github.com/redhat-developer/odo/pkg/devfile/adapters/common" "github.com/redhat-developer/odo/pkg/devfile/adapters/common"
"github.com/redhat-developer/odo/pkg/devfile/location" "github.com/redhat-developer/odo/pkg/devfile/location"
"github.com/redhat-developer/odo/pkg/log"
"github.com/redhat-developer/odo/pkg/version" "github.com/redhat-developer/odo/pkg/version"
"github.com/redhat-developer/odo/pkg/watch" "github.com/redhat-developer/odo/pkg/watch"
@@ -41,20 +50,21 @@ type DevOptions struct {
// Variables // Variables
ignorePaths []string ignorePaths []string
out io.Writer out io.Writer
errOut io.Writer
// it's called "initial" because it has to be set only once when running odo dev for the first time
// it is used to compare with updated devfile when we watch the contextDir for changes
initialDevfileObj parser.DevfileObj
// working directory // working directory
contextDir string contextDir string
} }
type DevHandler struct{} type Handler struct{}
func NewDevHandler() *DevHandler {
return &DevHandler{}
}
func NewDevOptions() *DevOptions { func NewDevOptions() *DevOptions {
return &DevOptions{ return &DevOptions{
out: log.GetStdout(), out: log.GetStdout(),
errOut: log.GetStderr(),
} }
} }
@@ -102,12 +112,14 @@ func (o *DevOptions) Complete(cmdline cmdline.Cmdline, args []string) error {
return fmt.Errorf("unable to create context: %v", err) return fmt.Errorf("unable to create context: %v", err)
} }
envFileInfo, err := envinfo.NewEnvSpecificInfo("") envfileinfo, err := envinfo.NewEnvSpecificInfo("")
if err != nil { if err != nil {
return fmt.Errorf("unable to retrieve configuration information: %v", err) return fmt.Errorf("unable to retrieve configuration information: %v", err)
} }
if !envFileInfo.Exists() { o.initialDevfileObj = o.Context.EnvSpecificInfo.GetDevfileObj()
if !envfileinfo.Exists() {
// if env.yaml doesn't exist, get component name from the devfile.yaml // if env.yaml doesn't exist, get component name from the devfile.yaml
var cmpName string var cmpName string
cmpName, err = component.GatherName(o.EnvSpecificInfo.GetDevfileObj(), o.GetDevfilePath()) cmpName, err = component.GatherName(o.EnvSpecificInfo.GetDevfileObj(), o.GetDevfilePath())
@@ -116,17 +128,18 @@ func (o *DevOptions) Complete(cmdline cmdline.Cmdline, args []string) error {
} }
// create env.yaml file with component, project/namespace and application info // create env.yaml file with component, project/namespace and application info
// TODO - store only namespace into env.yaml, we don't want to track component or application name via env.yaml // TODO - store only namespace into env.yaml, we don't want to track component or application name via env.yaml
err = envFileInfo.SetComponentSettings(envinfo.ComponentSettings{Name: cmpName, Project: o.GetProject(), AppName: "app"}) err = envfileinfo.SetComponentSettings(envinfo.ComponentSettings{Name: cmpName, Project: o.GetProject(), AppName: "app"})
if err != nil { if err != nil {
return fmt.Errorf("failed to write new env.yaml file: %w", err) return fmt.Errorf("failed to write new env.yaml file: %w", err)
} }
} else if envFileInfo.GetComponentSettings().Project != o.GetProject() { } else if envfileinfo.GetComponentSettings().Project != o.GetProject() {
// set namespace if the evn.yaml exists; that's the only piece we care about in env.yaml // set namespace if the evn.yaml exists; that's the only piece we care about in env.yaml
err = envFileInfo.SetConfiguration("project", o.GetProject()) err = envfileinfo.SetConfiguration("project", o.GetProject())
if err != nil { if err != nil {
return fmt.Errorf("failed to update project in env.yaml file: %w", err) return fmt.Errorf("failed to update project in env.yaml file: %w", err)
} }
} }
o.clientset.KubernetesClient.SetNamespace(o.GetProject())
// 3 steps to evaluate the paths to be ignored when "watching" the pwd/cwd for changes // 3 steps to evaluate the paths to be ignored when "watching" the pwd/cwd for changes
// 1. create an empty string slice to which paths like .gitignore, .odo/odo-file-index.json, etc. will be added // 1. create an empty string slice to which paths like .gitignore, .odo/odo-file-index.json, etc. will be added
@@ -166,12 +179,44 @@ func (o *DevOptions) Run() error {
"odo version: "+version.VERSION) "odo version: "+version.VERSION)
log.Section("Deploying to the cluster in developer mode") log.Section("Deploying to the cluster in developer mode")
d := DevHandler{} err = o.clientset.DevClient.Start(o.Context.EnvSpecificInfo.GetDevfileObj(), platformContext, path)
err = o.clientset.DevClient.Start(o.Context.EnvSpecificInfo.GetDevfileObj(), platformContext, o.ignorePaths, path, log.GetStdout(), &d) if err != nil {
return err
}
fmt.Fprintf(o.out, "\nYour application is running on cluster.\n ")
// get the endpoint/port information for containers in devfile and setup port-forwarding
containers, err := o.Context.EnvSpecificInfo.GetDevfileObj().Data.GetComponents(parsercommon.DevfileOptions{
ComponentOptions: parsercommon.ComponentOptions{ComponentType: v1alpha2.ContainerComponentType},
})
if err != nil {
return err
}
ceMapping := libdevfile.GetContainerEndpointMapping(containers)
portPairs := portPairsFromContainerEndpoints(ceMapping)
var portPairsSlice []string
for _, v1 := range portPairs {
portPairsSlice = append(portPairsSlice, v1...)
}
pod, err := o.clientset.KubernetesClient.GetPodUsingComponentName(o.Context.EnvSpecificInfo.GetDevfileObj().GetMetadataName())
if err != nil {
return err
}
go func() {
err = o.clientset.KubernetesClient.SetupPortForwarding(pod, portPairsSlice, o.errOut)
if err != nil {
fmt.Printf("failed to setup port-forwarding: %v\n", err)
}
}()
printPortForwardingInfo(portPairs, o.out)
d := Handler{}
err = o.clientset.DevClient.Watch(o.Context.EnvSpecificInfo.GetDevfileObj(), path, o.ignorePaths, o.out, &d)
return err return err
} }
func (o *DevHandler) RegenerateAdapterAndPush(pushParams common.PushParameters, watchParams watch.WatchParameters) error { // RegenerateAdapterAndPush regenerates the adapter and pushes the files to remote pod
func (o *Handler) RegenerateAdapterAndPush(pushParams common.PushParameters, watchParams watch.WatchParameters) error {
var adapter common.ComponentAdapter var adapter common.ComponentAdapter
adapter, err := regenerateComponentAdapterFromWatchParams(watchParams) adapter, err := regenerateComponentAdapterFromWatchParams(watchParams)
@@ -188,29 +233,44 @@ func (o *DevHandler) RegenerateAdapterAndPush(pushParams common.PushParameters,
} }
func regenerateComponentAdapterFromWatchParams(parameters watch.WatchParameters) (common.ComponentAdapter, error) { func regenerateComponentAdapterFromWatchParams(parameters watch.WatchParameters) (common.ComponentAdapter, error) {
// Parse devfile and validate. Path is hard coded because odo expects devfile.yaml to be present in the pwd/cwd.
devObj, err := ododevfile.ParseAndValidateFromFile(location.DevfileLocation("")) devObj, err := ododevfile.ParseAndValidateFromFile(location.DevfileLocation(""))
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !reflect.DeepEqual(parameters.InitialDevfileObj, devObj) {
log.Warningf("devfile.yaml has been changed; please restart the `odo dev` command\n\n")
}
platformContext := kubernetes.KubernetesContext{ platformContext := kubernetes.KubernetesContext{
Namespace: parameters.EnvSpecificInfo.GetNamespace(), Namespace: parameters.EnvSpecificInfo.GetNamespace(),
} }
return adapters.NewComponentAdapter(parameters.ComponentName, parameters.Path, parameters.ApplicationName, devObj, platformContext) return adapters.NewComponentAdapter(parameters.ComponentName, parameters.Path, parameters.ApplicationName, devObj, platformContext)
}
func printPortForwardingInfo(portPairs map[string][]string, out io.Writer) {
var portForwardURLs strings.Builder
for container, ports := range portPairs {
for _, pair := range ports {
split := strings.Split(pair, ":")
local := split[0]
remote := split[1]
portForwardURLs.WriteString(fmt.Sprintf("- Port %s from %q container forwarded to localhost:%s\n", remote, container, local))
}
}
fmt.Fprintf(out, "\n%s", portForwardURLs.String())
} }
// NewCmdDev implements the odo dev command // NewCmdDev implements the odo dev command
func NewCmdDev(name, fullName string) *cobra.Command { func NewCmdDev(name, fullName string) *cobra.Command {
o := NewDevOptions() o := NewDevOptions()
devCmd := &cobra.Command{ devCmd := &cobra.Command{
Use: name, Use: name,
Short: "Deploy component to development cluster", Short: "Deploy component to development cluster",
Long: "Deploy the component to a development cluster. odo dev is a long running command that will automatically sync your source to the cluster", Long: `odo dev is a long running command that will automatically sync your source to the cluster.
It forwards endpoints with exposure values 'public' or 'internal' to a port on localhost.`,
Example: fmt.Sprintf(devExample, fullName), Example: fmt.Sprintf(devExample, fullName),
Args: cobra.MaximumNArgs(0), Args: cobra.MaximumNArgs(0),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
@@ -218,10 +278,34 @@ func NewCmdDev(name, fullName string) *cobra.Command {
}, },
} }
clientset.Add(devCmd, clientset.DEV, clientset.INIT) clientset.Add(devCmd, clientset.DEV, clientset.INIT, clientset.KUBERNETES)
// Add a defined annotation in order to appear in the help menu // Add a defined annotation in order to appear in the help menu
devCmd.Annotations["command"] = "utility" devCmd.Annotations["command"] = "utility"
devCmd.SetUsageTemplate(odoutil.CmdUsageTemplate) devCmd.SetUsageTemplate(odoutil.CmdUsageTemplate)
return devCmd return devCmd
} }
// portPairsFromContainerEndpoints assigns a port on localhost to each port in the provided containerEndpoints map
// it returns a map of the format "<container-name>":{"<local-port-1>:<remote-port-1>", "<local-port-2>:<remote-port-2>"}
// "container1": {"400001:3000", "400002:3001"}
func portPairsFromContainerEndpoints(ceMap map[string][]int) map[string][]string {
portPairs := make(map[string][]string)
port := 40000
for name, ports := range ceMap {
for _, p := range ports {
port++
for {
isPortFree := util.IsPortFree(port)
if isPortFree {
pair := fmt.Sprintf("%d:%d", port, p)
portPairs[name] = append(portPairs[name], pair)
break
}
port++
}
}
}
return portPairs
}

View File

@@ -10,6 +10,7 @@ import (
"hash/adler32" "hash/adler32"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"net/url" "net/url"
"os" "os"
"os/signal" "os/signal"
@@ -760,3 +761,15 @@ func SafeGetBool(b *bool) bool {
func GetAdler32Value(s string) string { func GetAdler32Value(s string) string {
return fmt.Sprintf("%08x", adler32.Checksum([]byte(s))) return fmt.Sprintf("%08x", adler32.Checksum([]byte(s)))
} }
// IsPortFree checks if the port on localhost is free to use
func IsPortFree(port int) bool {
address := fmt.Sprintf("localhost:%d", port)
listener, err := net.Listen("tcp", address)
if err != nil {
return false
}
_ = listener.Addr().(*net.TCPAddr).Port
err = listener.Close()
return err == nil
}

View File

@@ -8,6 +8,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/devfile/library/pkg/devfile/parser"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/redhat-developer/odo/pkg/devfile/adapters/common" "github.com/redhat-developer/odo/pkg/devfile/adapters/common"
@@ -61,6 +63,8 @@ type WatchParameters struct {
DevfileRunCmd string DevfileRunCmd string
// DevfileDebugCmd takes the debug command through the command line and overwrites the devfile debug command // DevfileDebugCmd takes the debug command through the command line and overwrites the devfile debug command
DevfileDebugCmd string DevfileDebugCmd string
// InitialDevfileObj is used to compare the devfile between the very first run of odo dev and subsequent ones
InitialDevfileObj parser.DevfileObj
} }
// addRecursiveWatch handles adding watches recursively for the path provided // addRecursiveWatch handles adding watches recursively for the path provided
@@ -299,7 +303,7 @@ func (o *WatchClient) WatchAndPush(out io.Writer, parameters WatchParameters) er
if parameters.EnvSpecificInfo != nil && parameters.EnvSpecificInfo.GetRunMode() == envinfo.Debug { if parameters.EnvSpecificInfo != nil && parameters.EnvSpecificInfo.GetRunMode() == envinfo.Debug {
log.Finfof(out, "Component is running in debug mode\nPlease start port-forwarding in a different terminal.") log.Finfof(out, "Component is running in debug mode\nPlease start port-forwarding in a different terminal.")
} }
log.Finfof(out, "\nWaiting for something to change in %s\n\nPress Ctrl+c to exit.", parameters.Path) log.Finfof(out, "\nWatching for changes in the current directory %s\n\nPress Ctrl+c to exit.", parameters.Path)
showWaitingMessage = false showWaitingMessage = false
} }
// if a change happened more than 'delay' seconds ago, sync it now. // if a change happened more than 'delay' seconds ago, sync it now.
@@ -315,7 +319,7 @@ func (o *WatchClient) WatchAndPush(out io.Writer, parameters WatchParameters) er
fmt.Fprintf(out, "\n\nFile %s changed\n", file) fmt.Fprintf(out, "\n\nFile %s changed\n", file)
} }
if len(changedFiles) > 0 || len(deletedPaths) > 0 { if len(changedFiles) > 0 || len(deletedPaths) > 0 {
fmt.Fprintf(out, "Pushing files...\n") fmt.Fprintf(out, "Pushing files...\n\n")
fileInfo, err := os.Stat(parameters.Path) fileInfo, err := os.Stat(parameters.Path)
if err != nil { if err != nil {
return fmt.Errorf("%s: file doesn't exist: %w", parameters.Path, err) return fmt.Errorf("%s: file doesn't exist: %w", parameters.Path, err)

View File

@@ -70,3 +70,7 @@ mockgen -source=pkg/watch/interface.go \
mockgen -source=pkg/component/delete/interface.go \ mockgen -source=pkg/component/delete/interface.go \
-package delete \ -package delete \
-destination pkg/component/delete/mock.go -destination pkg/component/delete/mock.go
mockgen -source=pkg/dev/interface.go \
-package dev \
-destination pkg/dev/mock.go

View File

@@ -0,0 +1,67 @@
commands:
- exec:
commandLine: npm install
component: runtime
group:
isDefault: true
kind: build
workingDir: ${PROJECT_SOURCE}
id: install
- exec:
commandLine: npm start
component: runtime
group:
isDefault: true
kind: run
workingDir: ${PROJECT_SOURCE}
id: run
- exec:
commandLine: npm run debug
component: runtime
group:
isDefault: true
kind: debug
workingDir: ${PROJECT_SOURCE}
id: debug
- exec:
commandLine: npm test
component: runtime
group:
isDefault: true
kind: test
workingDir: ${PROJECT_SOURCE}
id: test
components:
- container:
endpoints:
- name: http-3000
targetPort: 3000
exposure: public
- name: http-4567
targetPort: 4567
exposure: internal
- name: http-7890
targetPort: 7890
exposure: none
image: registry.access.redhat.com/ubi8/nodejs-14:latest
memoryLimit: 1024Mi
mountSources: true
name: runtime
metadata:
description: Stack with Node.js 14
displayName: Node.js Runtime
icon: https://nodejs.org/static/images/logos/nodejs-new-pantone-black.svg
language: javascript
name: nodejs-ex
projectType: nodejs
tags:
- NodeJS
- Express
- ubi8
version: 1.0.1
schemaVersion: 2.0.0
starterProjects:
- git:
remotes:
origin: https://github.com/odo-devfiles/nodejs-ex.git
name: nodejs-starter

View File

@@ -0,0 +1,17 @@
{
"name": "nodejs-starter",
"version": "1.0.0",
"description": "Simple Node.js application",
"license": "EPL-2.0",
"scripts": {
"start": "node server.js",
"debug": "node --inspect-brk=${DEBUG_PORT} server.js",
"test": "node test/test.js"
},
"dependencies": {
"express": "^4.17.1",
"pino": "^6.2.1",
"pino-http": "^5.1.0",
"prom-client": "^12.0.0"
}
}

View File

@@ -0,0 +1,71 @@
const Prometheus = require('prom-client')
const express = require('express');
const http = require('http');
Prometheus.collectDefaultMetrics();
const requestHistogram = new Prometheus.Histogram({
name: 'http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['code', 'handler', 'method'],
buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5]
})
const requestTimer = (req, res, next) => {
const path = new URL(req.url, `http://${req.hostname}`).pathname
const stop = requestHistogram.startTimer({
method: req.method,
handler: path
})
res.on('finish', () => {
stop({
code: res.statusCode
})
})
next()
}
const app = express();
const server = http.createServer(app)
const server2 = http.createServer(app)
// See: http://expressjs.com/en/4x/api.html#app.settings.table
const PRODUCTION = app.get('env') === 'production';
// Administrative routes are not timed or logged, but for non-admin routes, pino
// overhead is included in timing.
app.get('/ready', (req, res) => res.status(200).json({status:"ok"}));
app.get('/live', (req, res) => res.status(200).json({status:"ok"}));
app.get('/metrics', (req, res, next) => {
res.set('Content-Type', Prometheus.register.contentType)
res.end(Prometheus.register.metrics())
})
// Time routes after here.
app.use(requestTimer);
// Log routes after here.
const pino = require('pino')({
level: PRODUCTION ? 'info' : 'debug',
});
app.use(require('pino-http')({logger: pino}));
app.get('/', (req, res) => {
// Use req.log (a `pino` instance) to log JSON:
req.log.info({message: 'Hello from Node.js Starter Application!'});
res.send('Hello from Node.js Starter Application!');
});
app.get('*', (req, res) => {
res.status(404).send("Not Found");
});
// Listen and serve.
const PORT = process.env.PORT || 3000;
server.listen(PORT, () => {
console.log(`App started on PORT ${PORT}`);
});
server2.listen(4567, () => {
console.log(`App started on PORT 4567`);
});

View File

@@ -0,0 +1,21 @@
var assert = require('assert');
(function(){
'use strict';
equal('should pass', function() {
assert(1 === 1);
});
function equal(desc, fn) {
try {
fn();
console.log('\x1b[32m%s\x1b[0m', '\u2714 ' + desc);
console.log("Add your tests in this ./test directory");
} catch (error) {
console.log('\n');
console.log('\x1b[31m%s\x1b[0m', '\u2718 ' + desc);
console.error(error);
}
}
})();

View File

@@ -1,6 +1,8 @@
package helper package helper
import ( import (
"regexp"
"github.com/onsi/gomega/gexec" "github.com/onsi/gomega/gexec"
) )
@@ -82,17 +84,32 @@ import (
}) })
}) })
*/ */
const localhostRegexp = "localhost:[0-9]*"
type DevSession struct { type DevSession struct {
session *gexec.Session session *gexec.Session
} }
// StartDevMode starts a dev session with `odo dev` // StartDevMode a session structure, the contents of the standard and error outputs, and the redirections endpoints to access ports opened by component
func StartDevMode() DevSession { // when the dev mode is completely started
func StartDevMode() (DevSession, []byte, []byte, []string, error) {
session := CmdRunner("odo", "dev") session := CmdRunner("odo", "dev")
WaitForOutputToContain("Waiting for something to change", 180, 10, session) WaitForOutputToContain("Watching for changes in the current directory", 180, 10, session)
return DevSession{ result := DevSession{
session: session, session: session,
} }
outContents := session.Out.Contents()
errContents := session.Err.Contents()
err := session.Out.Clear()
if err != nil {
return DevSession{}, nil, nil, nil, err
}
err = session.Err.Clear()
if err != nil {
return DevSession{}, nil, nil, nil, err
}
return result, outContents, errContents, FindAllMatchingStrings(string(outContents), localhostRegexp), nil
} }
// Kill a Dev session abruptly, without handling any cleanup // Kill a Dev session abruptly, without handling any cleanup
@@ -105,10 +122,21 @@ func (o DevSession) Stop() {
o.session.Interrupt() o.session.Interrupt()
} }
// RunDevMode runs a dev session and executes the `inside` code // RunDevMode runs a dev session and executes the `inside` code when the dev mode is completely started
func RunDevMode(inside func(session *gexec.Session)) { // The inside handler is passed the internal session pointer, the contents of the standard and error outputs,
session := StartDevMode() // and a slice of strings - urls - giving the redirections in the form localhost:<port_number> to access ports opened by component
func RunDevMode(inside func(session *gexec.Session, outContents []byte, errContents []byte, urls []string)) error {
session, outContents, errContents, urls, err := StartDevMode()
if err != nil {
return err
}
defer session.Stop() defer session.Stop()
WaitForOutputToContain("Waiting for something to change", 180, 10, session.session) inside(session.session, outContents, errContents, urls)
inside(session.session) return nil
}
// FindAllMatchingStrings returns all matches for the provided regExp as a slice of strings
func FindAllMatchingStrings(s, regExp string) []string {
re := regexp.MustCompile(regExp)
return re.FindAllString(s, -1)
} }

View File

@@ -40,6 +40,16 @@ func WaitForOutputToContain(substring string, timeoutInSeconds int, intervalInSe
} }
// WaitForErroutToContain waits for the session stdout output to contain a particular substring
func WaitForErroutToContain(substring string, timeoutInSeconds int, intervalInSeconds int, session *gexec.Session) {
Eventually(func() string {
contents := string(session.Err.Contents())
return contents
}, timeoutInSeconds, intervalInSeconds).Should(ContainSubstring(substring))
}
// WaitAndCheckForTerminatingState waits for the given interval // WaitAndCheckForTerminatingState waits for the given interval
// and checks if the given resource type has been deleted on the cluster or is in the terminating state // and checks if the given resource type has been deleted on the cluster or is in the terminating state
// path is the path to the program's binary // path is the path to the program's binary

View File

@@ -67,7 +67,9 @@ ComponentSettings:
When("the component is deployed in DEV mode and dev mode stopped", func() { When("the component is deployed in DEV mode and dev mode stopped", func() {
var devSession helper.DevSession var devSession helper.DevSession
BeforeEach(func() { BeforeEach(func() {
devSession = helper.StartDevMode() var err error
devSession, _, _, _, err = helper.StartDevMode()
Expect(err).ToNot(HaveOccurred())
defer devSession.Kill() defer devSession.Kill()
Expect(commonVar.CliRunner.Run(getDeployArgs...).Out.Contents()).To(ContainSubstring(cmpName)) Expect(commonVar.CliRunner.Run(getDeployArgs...).Out.Contents()).To(ContainSubstring(cmpName))
}) })

View File

@@ -2,15 +2,17 @@ package devfile
import ( import (
"fmt" "fmt"
"io"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/onsi/gomega/gexec"
"github.com/redhat-developer/odo/pkg/util" "github.com/redhat-developer/odo/pkg/util"
"github.com/onsi/gomega/gexec"
"github.com/redhat-developer/odo/tests/helper" "github.com/redhat-developer/odo/tests/helper"
) )
@@ -51,122 +53,120 @@ var _ = Describe("odo dev command tests", func() {
Expect(helper.VerifyFileExists(".odo/env/env.yaml")).To(BeFalse()) Expect(helper.VerifyFileExists(".odo/env/env.yaml")).To(BeFalse())
}) })
It("should show validation errors if the devfile is incorrect", func() { It("should show validation errors if the devfile is incorrect", func() {
helper.RunDevMode(func(session *gexec.Session) { err := helper.RunDevMode(func(session *gexec.Session, outContents, errContents []byte, urls []string) {
helper.ReplaceString(filepath.Join(commonVar.Context, "devfile.yaml"), "kind: run", "kind: build") helper.ReplaceString(filepath.Join(commonVar.Context, "devfile.yaml"), "kind: run", "kind: build")
helper.WaitForOutputToContain("Error occurred on Push", 180, 10, session) helper.WaitForOutputToContain("Error occurred on Push", 180, 10, session)
}) })
Expect(err).ToNot(HaveOccurred())
}) })
It("should use the index information from previous push operation", func() { It("should use the index information from previous push operation", func() {
// Create a new file A // Create a new file A
fileAPath, fileAText := helper.CreateSimpleFile(commonVar.Context, "my-file-", ".txt") fileAPath, fileAText := helper.CreateSimpleFile(commonVar.Context, "my-file-", ".txt")
// watch that project // watch that project
helper.RunDevMode(func(session *gexec.Session) { err := helper.RunDevMode(func(session *gexec.Session, outContents, errContents []byte, urls []string) {
// Change some other file B // Change some other file B
helper.ReplaceString(filepath.Join(commonVar.Context, "server.js"), "App started", "App is super started") helper.ReplaceString(filepath.Join(commonVar.Context, "server.js"), "App started", "App is super started")
podName := commonVar.CliRunner.GetRunningPodNameByComponent(cmpName, commonVar.Project) podName := commonVar.CliRunner.GetRunningPodNameByComponent(cmpName, commonVar.Project)
// File should exist, and its content should match what we initially set it to // File should exist, and its content should match what we initially set it to
execResult := commonVar.CliRunner.Exec(podName, commonVar.Project, "cat", "/projects/"+filepath.Base(fileAPath)) execResult := commonVar.CliRunner.Exec(podName, commonVar.Project, "cat", "/projects/"+filepath.Base(fileAPath))
Expect(execResult).To(ContainSubstring(fileAText)) Expect(execResult).To(ContainSubstring(fileAText))
}) })
Expect(err).ToNot(HaveOccurred())
}) })
It("ensure that index information is updated", func() { It("ensure that index information is updated", func() {
// watch that project err := helper.RunDevMode(func(session *gexec.Session, outContents, errContents []byte, urls []string) {
session := helper.CmdRunner("odo", "dev") indexAfterPush, err := util.ReadFileIndex(filepath.Join(commonVar.Context, ".odo", "odo-file-index.json"))
defer session.Kill() Expect(err).ToNot(HaveOccurred())
helper.WaitForOutputToContain("Waiting for something to change", 180, 10, session) // Create a new file A
indexAfterPush, err := util.ReadFileIndex(filepath.Join(commonVar.Context, ".odo", "odo-file-index.json")) fileAPath, _ := helper.CreateSimpleFile(commonVar.Context, "my-file-", ".txt")
Expect(err).ToNot(HaveOccurred())
// Create a new file A // Wait for the new file to exist in the index
fileAPath, _ := helper.CreateSimpleFile(commonVar.Context, "my-file-", ".txt") Eventually(func() bool {
// Wait for the new file to exist in the index newIndexAfterPush, readErr := util.ReadFileIndex(filepath.Join(commonVar.Context, ".odo", "odo-file-index.json"))
Eventually(func() bool { if readErr != nil {
fmt.Fprintln(GinkgoWriter, "New index not found or could not be read", readErr)
newIndexAfterPush, readErr := util.ReadFileIndex(filepath.Join(commonVar.Context, ".odo", "odo-file-index.json")) return false
if readErr != nil {
fmt.Fprintln(GinkgoWriter, "New index not found or could not be read", readErr)
return false
}
_, exists := newIndexAfterPush.Files[filepath.Base(fileAPath)]
if !exists {
fmt.Fprintln(GinkgoWriter, "path", fileAPath, "not found.", readErr)
}
return exists
}, 180, 10).Should(Equal(true))
// Delete file A and verify that it disappears from the index
err = os.Remove(fileAPath)
Expect(err).ToNot(HaveOccurred())
Eventually(func() bool {
newIndexAfterPush, err := util.ReadFileIndex(filepath.Join(commonVar.Context, ".odo", "odo-file-index.json"))
if err != nil {
fmt.Fprintln(GinkgoWriter, "New index not found or could not be read", err)
return false
}
// Sanity test: at least one file should be present
if len(newIndexAfterPush.Files) == 0 {
return false
}
// The fileA file should NOT be found
match := false
for relativeFilePath := range newIndexAfterPush.Files {
if strings.Contains(relativeFilePath, filepath.Base(fileAPath)) {
match = true
} }
}
return !match
}, 180, 10).Should(Equal(true)) _, exists := newIndexAfterPush.Files[filepath.Base(fileAPath)]
if !exists {
fmt.Fprintln(GinkgoWriter, "path", fileAPath, "not found.", readErr)
}
return exists
// Change server.js }, 180, 10).Should(Equal(true))
helper.ReplaceString(filepath.Join(commonVar.Context, "server.js"), "App started", "App is super started")
helper.WaitForOutputToContain("server.js", 180, 10, session)
// Wait for the size values in the old and new index files to differ, indicating that watch has updated the index // Delete file A and verify that it disappears from the index
Eventually(func() bool { err = os.Remove(fileAPath)
Expect(err).ToNot(HaveOccurred())
Eventually(func() bool {
newIndexAfterPush, err := util.ReadFileIndex(filepath.Join(commonVar.Context, ".odo", "odo-file-index.json")) newIndexAfterPush, err := util.ReadFileIndex(filepath.Join(commonVar.Context, ".odo", "odo-file-index.json"))
if err != nil { if err != nil {
fmt.Fprintln(GinkgoWriter, "New index not found or could not be read", err) fmt.Fprintln(GinkgoWriter, "New index not found or could not be read", err)
return false return false
} }
beforePushValue, exists := indexAfterPush.Files["server.js"] // Sanity test: at least one file should be present
if !exists { if len(newIndexAfterPush.Files) == 0 {
fmt.Fprintln(GinkgoWriter, "server.js not found in old index file") return false
return false }
}
afterPushValue, exists := newIndexAfterPush.Files["server.js"] // The fileA file should NOT be found
if !exists { match := false
fmt.Fprintln(GinkgoWriter, "server.js not found in new index file") for relativeFilePath := range newIndexAfterPush.Files {
return false
}
fmt.Fprintln(GinkgoWriter, "comparing old and new file sizes", beforePushValue.Size, afterPushValue.Size) if strings.Contains(relativeFilePath, filepath.Base(fileAPath)) {
match = true
}
}
return !match
return beforePushValue.Size != afterPushValue.Size }, 180, 10).Should(Equal(true))
}, 180, 10).Should(Equal(true)) // Change server.js
helper.ReplaceString(filepath.Join(commonVar.Context, "server.js"), "App started", "App is super started")
helper.WaitForOutputToContain("server.js", 180, 10, session)
// Wait for the size values in the old and new index files to differ, indicating that watch has updated the index
Eventually(func() bool {
newIndexAfterPush, err := util.ReadFileIndex(filepath.Join(commonVar.Context, ".odo", "odo-file-index.json"))
if err != nil {
fmt.Fprintln(GinkgoWriter, "New index not found or could not be read", err)
return false
}
beforePushValue, exists := indexAfterPush.Files["server.js"]
if !exists {
fmt.Fprintln(GinkgoWriter, "server.js not found in old index file")
return false
}
afterPushValue, exists := newIndexAfterPush.Files["server.js"]
if !exists {
fmt.Fprintln(GinkgoWriter, "server.js not found in new index file")
return false
}
fmt.Fprintln(GinkgoWriter, "comparing old and new file sizes", beforePushValue.Size, afterPushValue.Size)
return beforePushValue.Size != afterPushValue.Size
}, 180, 10).Should(Equal(true))
})
Expect(err).ToNot(HaveOccurred())
}) })
When("odo dev is executed", func() { When("odo dev is executed", func() {
BeforeEach(func() { BeforeEach(func() {
session := helper.CmdRunner("odo", "dev") devSession, _, _, _, err := helper.StartDevMode()
helper.WaitForOutputToContain("Waiting for something to change", 180, 10, session) Expect(err).ToNot(HaveOccurred())
defer session.Kill() defer devSession.Kill()
// An ENV file should have been created indicating current namespace // An ENV file should have been created indicating current namespace
Expect(helper.VerifyFileExists(".odo/env/env.yaml")).To(BeTrue()) Expect(helper.VerifyFileExists(".odo/env/env.yaml")).To(BeTrue())
helper.FileShouldContainSubstring(".odo/env/env.yaml", "Project: "+commonVar.Project) helper.FileShouldContainSubstring(".odo/env/env.yaml", "Project: "+commonVar.Project)
@@ -191,18 +191,103 @@ var _ = Describe("odo dev command tests", func() {
}) })
It("should run odo dev on initial namespace", func() { It("should run odo dev on initial namespace", func() {
session := helper.CmdRunner("odo", "dev") err := helper.RunDevMode(func(session *gexec.Session, outContents, errContents []byte, urls []string) {
helper.WaitForOutputToContain("Waiting for something to change", 180, 10, session) output := commonVar.CliRunner.Run("get", "deployment").Err.Contents()
defer session.Kill() Expect(string(output)).To(ContainSubstring("No resources found in " + otherNS + " namespace."))
output := commonVar.CliRunner.Run("get", "deployment").Err.Contents() output = commonVar.CliRunner.Run("get", "deployment", "-n", commonVar.Project).Out.Contents()
Expect(string(output)).To(ContainSubstring("No resources found in " + otherNS + " namespace.")) Expect(string(output)).To(ContainSubstring(cmpName))
})
output = commonVar.CliRunner.Run("get", "deployment", "-n", commonVar.Project).Out.Contents() Expect(err).ToNot(HaveOccurred())
Expect(string(output)).To(ContainSubstring(cmpName))
}) })
}) })
}) })
})
Context("port-forwarding for the component", func() {
When("devfile has single endpoint", func() {
BeforeEach(func() {
helper.CopyExample(filepath.Join("source", "devfiles", "nodejs", "project"), commonVar.Context)
helper.Cmd("odo", "project", "set", commonVar.Project).ShouldPass()
helper.Cmd("odo", "init", "--name", cmpName, "--devfile-path", helper.GetExamplePath("source", "devfiles", "nodejs", "devfile.yaml")).ShouldPass()
})
It("should expose the endpoint on localhost", func() {
err := helper.RunDevMode(func(session *gexec.Session, outContents, errContents []byte, urls []string) {
url := fmt.Sprintf("http://%s", urls[0])
resp, err := http.Get(url)
Expect(err).ToNot(HaveOccurred())
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
helper.MatchAllInOutput(string(body), []string{"Hello from Node.js Starter Application!"})
})
Expect(err).ToNot(HaveOccurred())
})
})
When("devfile has multiple endpoints", func() {
BeforeEach(func() {
helper.CopyExample(filepath.Join("source", "devfiles", "nodejs", "project-with-multiple-endpoints"), commonVar.Context)
helper.Cmd("odo", "project", "set", commonVar.Project).ShouldPass()
helper.Cmd("odo", "init", "--name", cmpName, "--devfile-path", helper.GetExamplePath("source", "devfiles", "nodejs", "devfile-with-multiple-endpoints.yaml")).ShouldPass()
})
It("should expose two endpoints on localhost", func() {
err := helper.RunDevMode(func(session *gexec.Session, outContents, errContents []byte, urls []string) {
url1 := fmt.Sprintf("http://%s", urls[0])
url2 := fmt.Sprintf("http://%s", urls[1])
resp1, err := http.Get(url1)
Expect(err).ToNot(HaveOccurred())
defer resp1.Body.Close()
resp2, err := http.Get(url2)
Expect(err).ToNot(HaveOccurred())
defer resp2.Body.Close()
body1, _ := io.ReadAll(resp1.Body)
helper.MatchAllInOutput(string(body1), []string{"Hello from Node.js Starter Application!"})
body2, _ := io.ReadAll(resp2.Body)
helper.MatchAllInOutput(string(body2), []string{"Hello from Node.js Starter Application!"})
helper.ReplaceString("server.js", "Hello from Node.js", "H3110 from Node.js")
helper.WaitForOutputToContain("Watching for changes in the current directory", 180, 10, session)
Eventually(func() bool {
resp3, err := http.Get(url1)
if err != nil {
return false
}
defer resp3.Body.Close()
resp4, err := http.Get(url2)
if err != nil {
return false
}
defer resp4.Body.Close()
body3, _ := io.ReadAll(resp3.Body)
if string(body3) != "H3110 from Node.js Starter Application!" {
return false
}
body4, _ := io.ReadAll(resp4.Body)
return string(body4) == "H3110 from Node.js Starter Application!"
}, 180, 10).Should(Equal(true))
})
Expect(err).ToNot(HaveOccurred())
})
When("an endpoint is added after first run of odo dev", func() {
It("should print the message to run odo dev again", func() {
err := helper.RunDevMode(func(session *gexec.Session, outContents, errContents []byte, urls []string) {
helper.ReplaceString("devfile.yaml", "exposure: none", "exposure: public")
helper.WaitForErroutToContain("devfile.yaml has been changed; please restart the `odo dev` command", 180, 10, session)
})
Expect(err).ToNot(HaveOccurred())
})
})
})
}) })
}) })

View File

@@ -83,7 +83,9 @@ var _ = Describe("odo list with devfile", func() {
When("the component is pushed in dev mode", func() { When("the component is pushed in dev mode", func() {
var devSession helper.DevSession var devSession helper.DevSession
BeforeEach(func() { BeforeEach(func() {
devSession = helper.StartDevMode() var err error
devSession, _, _, _, err = helper.StartDevMode()
Expect(err).ToNot(HaveOccurred())
}) })
AfterEach(func() { AfterEach(func() {
devSession.Stop() devSession.Stop()

19
vendor/k8s.io/client-go/tools/portforward/doc.go generated vendored Normal file
View File

@@ -0,0 +1,19 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package portforward adds support for SSH-like port forwarding from the client's
// local host to remote containers.
package portforward // import "k8s.io/client-go/tools/portforward"

View File

@@ -0,0 +1,429 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/runtime"
)
// PortForwardProtocolV1Name is the subprotocol used for port forwarding.
// TODO move to API machinery and re-unify with kubelet/server/portfoward
const PortForwardProtocolV1Name = "portforward.k8s.io"
// PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request.
type PortForwarder struct {
addresses []listenAddress
ports []ForwardedPort
stopChan <-chan struct{}
dialer httpstream.Dialer
streamConn httpstream.Connection
listeners []io.Closer
Ready chan struct{}
requestIDLock sync.Mutex
requestID int
out io.Writer
errOut io.Writer
}
// ForwardedPort contains a Local:Remote port pairing.
type ForwardedPort struct {
Local uint16
Remote uint16
}
/*
valid port specifications:
5000
- forwards from localhost:5000 to pod:5000
8888:5000
- forwards from localhost:8888 to pod:5000
0:5000
:5000
- selects a random available local port,
forwards from localhost:<random port> to pod:5000
*/
func parsePorts(ports []string) ([]ForwardedPort, error) {
var forwards []ForwardedPort
for _, portString := range ports {
parts := strings.Split(portString, ":")
var localString, remoteString string
if len(parts) == 1 {
localString = parts[0]
remoteString = parts[0]
} else if len(parts) == 2 {
localString = parts[0]
if localString == "" {
// support :5000
localString = "0"
}
remoteString = parts[1]
} else {
return nil, fmt.Errorf("invalid port format '%s'", portString)
}
localPort, err := strconv.ParseUint(localString, 10, 16)
if err != nil {
return nil, fmt.Errorf("error parsing local port '%s': %s", localString, err)
}
remotePort, err := strconv.ParseUint(remoteString, 10, 16)
if err != nil {
return nil, fmt.Errorf("error parsing remote port '%s': %s", remoteString, err)
}
if remotePort == 0 {
return nil, fmt.Errorf("remote port must be > 0")
}
forwards = append(forwards, ForwardedPort{uint16(localPort), uint16(remotePort)})
}
return forwards, nil
}
type listenAddress struct {
address string
protocol string
failureMode string
}
func parseAddresses(addressesToParse []string) ([]listenAddress, error) {
var addresses []listenAddress
parsed := make(map[string]listenAddress)
for _, address := range addressesToParse {
if address == "localhost" {
if _, exists := parsed["127.0.0.1"]; !exists {
ip := listenAddress{address: "127.0.0.1", protocol: "tcp4", failureMode: "all"}
parsed[ip.address] = ip
}
if _, exists := parsed["::1"]; !exists {
ip := listenAddress{address: "::1", protocol: "tcp6", failureMode: "all"}
parsed[ip.address] = ip
}
} else if net.ParseIP(address).To4() != nil {
parsed[address] = listenAddress{address: address, protocol: "tcp4", failureMode: "any"}
} else if net.ParseIP(address) != nil {
parsed[address] = listenAddress{address: address, protocol: "tcp6", failureMode: "any"}
} else {
return nil, fmt.Errorf("%s is not a valid IP", address)
}
}
addresses = make([]listenAddress, len(parsed))
id := 0
for _, v := range parsed {
addresses[id] = v
id++
}
// Sort addresses before returning to get a stable order
sort.Slice(addresses, func(i, j int) bool { return addresses[i].address < addresses[j].address })
return addresses, nil
}
// New creates a new PortForwarder with localhost listen addresses.
func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
return NewOnAddresses(dialer, []string{"localhost"}, ports, stopChan, readyChan, out, errOut)
}
// NewOnAddresses creates a new PortForwarder with custom listen addresses.
func NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
if len(addresses) == 0 {
return nil, errors.New("you must specify at least 1 address")
}
parsedAddresses, err := parseAddresses(addresses)
if err != nil {
return nil, err
}
if len(ports) == 0 {
return nil, errors.New("you must specify at least 1 port")
}
parsedPorts, err := parsePorts(ports)
if err != nil {
return nil, err
}
return &PortForwarder{
dialer: dialer,
addresses: parsedAddresses,
ports: parsedPorts,
stopChan: stopChan,
Ready: readyChan,
out: out,
errOut: errOut,
}, nil
}
// ForwardPorts formats and executes a port forwarding request. The connection will remain
// open until stopChan is closed.
func (pf *PortForwarder) ForwardPorts() error {
defer pf.Close()
var err error
pf.streamConn, _, err = pf.dialer.Dial(PortForwardProtocolV1Name)
if err != nil {
return fmt.Errorf("error upgrading connection: %s", err)
}
defer pf.streamConn.Close()
return pf.forward()
}
// forward dials the remote host specific in req, upgrades the request, starts
// listeners for each port specified in ports, and forwards local connections
// to the remote host via streams.
func (pf *PortForwarder) forward() error {
var err error
listenSuccess := false
for i := range pf.ports {
port := &pf.ports[i]
err = pf.listenOnPort(port)
switch {
case err == nil:
listenSuccess = true
default:
if pf.errOut != nil {
fmt.Fprintf(pf.errOut, "Unable to listen on port %d: %v\n", port.Local, err)
}
}
}
if !listenSuccess {
return fmt.Errorf("unable to listen on any of the requested ports: %v", pf.ports)
}
if pf.Ready != nil {
close(pf.Ready)
}
// wait for interrupt or conn closure
select {
case <-pf.stopChan:
case <-pf.streamConn.CloseChan():
runtime.HandleError(errors.New("lost connection to pod"))
}
return nil
}
// listenOnPort delegates listener creation and waits for connections on requested bind addresses.
// An error is raised based on address groups (default and localhost) and their failure modes
func (pf *PortForwarder) listenOnPort(port *ForwardedPort) error {
var errors []error
failCounters := make(map[string]int, 2)
successCounters := make(map[string]int, 2)
for _, addr := range pf.addresses {
err := pf.listenOnPortAndAddress(port, addr.protocol, addr.address)
if err != nil {
errors = append(errors, err)
failCounters[addr.failureMode]++
} else {
successCounters[addr.failureMode]++
}
}
if successCounters["all"] == 0 && failCounters["all"] > 0 {
return fmt.Errorf("%s: %v", "Listeners failed to create with the following errors", errors)
}
if failCounters["any"] > 0 {
return fmt.Errorf("%s: %v", "Listeners failed to create with the following errors", errors)
}
return nil
}
// listenOnPortAndAddress delegates listener creation and waits for new connections
// in the background f
func (pf *PortForwarder) listenOnPortAndAddress(port *ForwardedPort, protocol string, address string) error {
listener, err := pf.getListener(protocol, address, port)
if err != nil {
return err
}
pf.listeners = append(pf.listeners, listener)
go pf.waitForConnection(listener, *port)
return nil
}
// getListener creates a listener on the interface targeted by the given hostname on the given port with
// the given protocol. protocol is in net.Listen style which basically admits values like tcp, tcp4, tcp6
func (pf *PortForwarder) getListener(protocol string, hostname string, port *ForwardedPort) (net.Listener, error) {
listener, err := net.Listen(protocol, net.JoinHostPort(hostname, strconv.Itoa(int(port.Local))))
if err != nil {
return nil, fmt.Errorf("unable to create listener: Error %s", err)
}
listenerAddress := listener.Addr().String()
host, localPort, _ := net.SplitHostPort(listenerAddress)
localPortUInt, err := strconv.ParseUint(localPort, 10, 16)
if err != nil {
fmt.Fprintf(pf.out, "Failed to forward from %s:%d -> %d\n", hostname, localPortUInt, port.Remote)
return nil, fmt.Errorf("error parsing local port: %s from %s (%s)", err, listenerAddress, host)
}
port.Local = uint16(localPortUInt)
if pf.out != nil {
fmt.Fprintf(pf.out, "Forwarding from %s -> %d\n", net.JoinHostPort(hostname, strconv.Itoa(int(localPortUInt))), port.Remote)
}
return listener, nil
}
// waitForConnection waits for new connections to listener and handles them in
// the background.
func (pf *PortForwarder) waitForConnection(listener net.Listener, port ForwardedPort) {
for {
conn, err := listener.Accept()
if err != nil {
// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err))
}
return
}
go pf.handleConnection(conn, port)
}
}
func (pf *PortForwarder) nextRequestID() int {
pf.requestIDLock.Lock()
defer pf.requestIDLock.Unlock()
id := pf.requestID
pf.requestID++
return id
}
// handleConnection copies data between the local connection and the stream to
// the remote server.
func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
defer conn.Close()
if pf.out != nil {
fmt.Fprintf(pf.out, "Handling connection for %d\n", port.Local)
}
requestID := pf.nextRequestID()
// create error stream
headers := http.Header{}
headers.Set(v1.StreamType, v1.StreamTypeError)
headers.Set(v1.PortHeader, fmt.Sprintf("%d", port.Remote))
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
errorStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err))
return
}
// we're not writing to this stream
errorStream.Close()
errorChan := make(chan error)
go func() {
message, err := ioutil.ReadAll(errorStream)
switch {
case err != nil:
errorChan <- fmt.Errorf("error reading from error stream for port %d -> %d: %v", port.Local, port.Remote, err)
case len(message) > 0:
errorChan <- fmt.Errorf("an error occurred forwarding %d -> %d: %v", port.Local, port.Remote, string(message))
}
close(errorChan)
}()
// create data stream
headers.Set(v1.StreamType, v1.StreamTypeData)
dataStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
return
}
localError := make(chan struct{})
remoteDone := make(chan struct{})
go func() {
// Copy from the remote side to the local port.
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
}
// inform the select below that the remote copy is done
close(remoteDone)
}()
go func() {
// inform server we're not sending any more data after copy unblocks
defer dataStream.Close()
// Copy from the local port to the remote side.
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
// break out of the select below without waiting for the other copy to finish
close(localError)
}
}()
// wait for either a local->remote error or for copying from remote->local to finish
select {
case <-remoteDone:
case <-localError:
}
// always expect something on errorChan (it may be nil)
err = <-errorChan
if err != nil {
runtime.HandleError(err)
}
}
// Close stops all listeners of PortForwarder.
func (pf *PortForwarder) Close() {
// stop all listeners
for _, l := range pf.listeners {
if err := l.Close(); err != nil {
runtime.HandleError(fmt.Errorf("error closing listener: %v", err))
}
}
}
// GetPorts will return the ports that were forwarded; this can be used to
// retrieve the locally-bound port in cases where the input was port 0. This
// function will signal an error if the Ready channel is nil or if the
// listeners are not ready yet; this function will succeed after the Ready
// channel has been closed.
func (pf *PortForwarder) GetPorts() ([]ForwardedPort, error) {
if pf.Ready == nil {
return nil, fmt.Errorf("no Ready channel provided")
}
select {
case <-pf.Ready:
return pf.ports, nil
default:
return nil, fmt.Errorf("listeners not ready")
}
}

1
vendor/modules.txt vendored
View File

@@ -1085,6 +1085,7 @@ k8s.io/client-go/tools/leaderelection
k8s.io/client-go/tools/leaderelection/resourcelock k8s.io/client-go/tools/leaderelection/resourcelock
k8s.io/client-go/tools/metrics k8s.io/client-go/tools/metrics
k8s.io/client-go/tools/pager k8s.io/client-go/tools/pager
k8s.io/client-go/tools/portforward
k8s.io/client-go/tools/record k8s.io/client-go/tools/record
k8s.io/client-go/tools/record/util k8s.io/client-go/tools/record/util
k8s.io/client-go/tools/reference k8s.io/client-go/tools/reference