Skip to content

Instantly share code, notes, and snippets.

Created June 21, 2022 17:57
Show Gist options
  • Save morganmcg1/d40cdf9ec0f4a88a8bf028b63e768978 to your computer and use it in GitHub Desktop.
Save morganmcg1/d40cdf9ec0f4a88a8bf028b63e768978 to your computer and use it in GitHub Desktop.
Code for DBPlot
# Decision Boundary Utils
-# Modified from
-class DBPlot(BaseEstimator):
- def __init__(
- self,
- estimator=KNeighborsClassifier(n_neighbors=10),
- acceptance_threshold=0.03,
- n_decision_boundary_keypoints=60,
- n_connecting_keypoints=None,
- n_interpolated_keypoints=None,
- n_generated_testpoints_per_keypoint=15,
- linear_iteration_budget=100,
- hypersphere_iteration_budget=300,
- verbose=False,
- ):
- if acceptance_threshold == 0:
- raise Warning(
- "A nonzero acceptance threshold is strongly recommended so the optimizer can finish in finite time"
- )
- if linear_iteration_budget < 2 or hypersphere_iteration_budget < 2:
- raise Exception("Invalid iteration budget")
- self.classifier = estimator
- self.acceptance_threshold = acceptance_threshold
- if (
- n_decision_boundary_keypoints
- and n_connecting_keypoints
- and n_interpolated_keypoints
- and n_connecting_keypoints + n_interpolated_keypoints
- != n_decision_boundary_keypoints
- ):
- raise Exception(
them to None to use calculated suggestions)"
- )
- self.n_connecting_keypoints = (
- n_connecting_keypoints
- if n_connecting_keypoints != None
- else n_decision_boundary_keypoints / 3
- )
- self.n_interpolated_keypoints = (
- n_interpolated_keypoints
- if n_interpolated_keypoints != None
- else n_decision_boundary_keypoints * 2 / 3
- )
- self.linear_iteration_budget = linear_iteration_budget
- self.n_generated_testpoints_per_keypoint = n_generated_testpoints_per_keypoint
- self.hypersphere_iteration_budget = hypersphere_iteration_budget
- self.verbose = verbose
- self.decision_boundary_points = []
- self.decision_boundary_points_2d = []
- self.X_testpoints = []
- self.y_testpoints = []
- self.train_idx = []
- self.test_idx = []
- self.background = []
- self.steps = 3
self.hypersphere_max_retry_budget = 20
- self.penalties_enabled = True
- self.random_gap_selection = False
- def setclassifier(self, estimator=KNeighborsClassifier(n_neighbors=10)):
- self.classifier = estimator
- def fit(self, X, y, training_indices=None):
- if set(np.array(y, dtype=int).tolist()) != set([0, 1]):
- raise Exception(
- "Currently only implemented for binary classification. Make sure you pass in two classes (0 and 1)"
- )
- if training_indices == None:
- train_idx = range(len(y))
- else:
- train_idx = training_indices
- self.X = X
- self.y = y
- self.train_idx = train_idx
- # self.test_idx = np.setdiff1d(np.arange(len(y)), self.train_idx, assume_unique=False)
- self.test_idx = list(set(range(len(y))).difference(set(self.train_idx)))
- # fit classifier if necessary
- try:
- self.classifier.predict([X[0]])
- except:
-, y)
- self.y_pred = self.classifier.predict(self.X)
- # fit DR method if necessary
- try:
- dimensionality_reduction.transform([X[0]])
- except:
- raise Exception(
bt, use PCA or Isomap"
- )
- # transform data
- self.X2d = dimensionality_reduction.transform(self.X)
- self.mean_2d_dist = np.mean(pdist(self.X2d))
- self.X2d_xmin, self.X2d_xmax = np.min(self.X2d[:, 0]), np.max(self.X2d[:, 0])
- self.X2d_ymin, self.X2d_ymax = np.min(self.X2d[:, 1]), np.max(self.X2d[:, 1])
- self.majorityclass = 0 if list(y).count(0) > list(y).count(1) else 1
- self.minorityclass = 1 - self.majorityclass
- minority_idx, majority_idx = (
- np.where(y == self.minorityclass)[0],
- np.where(y == self.majorityclass)[0],
- )
- self.Xminor, self.Xmajor = X[minority_idx], X[majority_idx]
- self.Xminor2d, self.Xmajor2d = self.X2d[minority_idx], self.X2d[majority_idx]
- # set up efficient nearest neighbor models for later use
- self.nn_model_2d_majorityclass = NearestNeighbors(n_neighbors=2)
-[majority_idx, :])
- self.nn_model_2d_minorityclass = NearestNeighbors(n_neighbors=2)
-[minority_idx, :])
- # step 1. look for decision boundary points between corners of majority &
- # minority class distribution
- minority_corner_idx, majority_corner_idx = [], []
- for extremum1 in [np.min, np.max]:
- for extremum2 in [np.min, np.max]:
- _, idx = self.nn_model_2d_minorityclass.kneighbors(
- [[extremum1(self.Xminor2d[:, 0]), extremum2(self.Xminor2d[:, 1])]]
- )
- minority_corner_idx.append(idx[0][0])
- _, idx = self.nn_model_2d_majorityclass.kneighbors(
- [[extremum1(self.Xmajor2d[:, 0]), extremum2(self.Xmajor2d[:, 1])]]
- )
- majority_corner_idx.append(idx[0][0])
- # optimize to find new db keypoints between corners
- self._linear_decision_boundary_optimization(
- minority_corner_idx, majority_corner_idx, all_combinations=True, step=1
- )
- # step 2. look for decision boundary points on lines connecting randomly
- # sampled points of majority & minority class
- n_samples = int(self.n_connecting_keypoints)
- from_idx = list(random.sample(list(np.arange(len(self.Xminor))), n_samples))
- to_idx = list(random.sample(list(np.arange(len(self.Xmajor))), n_samples))
- # optimize to find new db keypoints between minority and majority class
- self._linear_decision_boundary_optimization(
- from_idx, to_idx, all_combinations=False, step=2
- )
- if len(self.decision_boundary_points_2d) < 2:
- wandb.termerror(
(usually requires an even number of estimators/neighbors/etc)."
- )
- return, y, training_indices)
- # step 3. look for decision boundary points between already known db
- # points that are too distant (search on connecting line first, then on
- # surrounding hypersphere surfaces)
- edges, gap_distances, gap_probability_scores = (
- self._get_sorted_db_keypoint_distances()
- ) # find gaps
- self.nn_model_decision_boundary_points = NearestNeighbors(n_neighbors=2)
- i = 0
- retries = 0
while i < self.n_interpolated_keypoints:
- if self.random_gap_selection:
- # randomly sample from sorted DB keypoint gaps?
- gap_idx = np.random.choice(
- len(gap_probability_scores), 1, p=gap_probability_scores
- )[0]
- else:
- # get largest gap
- gap_idx = 0
- from_point = self.decision_boundary_points[edges[gap_idx][0]]
- to_point = self.decision_boundary_points[edges[gap_idx][1]]
- # optimize to find new db keypoint along line connecting two db keypoints
- # with large gap
- db_point = self._find_decision_boundary_along_line(
- from_point, to_point, penalize_tangent_distance=self.penalties_enabled
- )
- if self.decision_boundary_distance(db_point) > self.acceptance_threshold:
- if self.verbose:
- wandb.termerror(
face around known decision boundary point"
- )
- # hypersphere radius half the distance between from and to db keypoints
- R = euclidean(from_point, to_point) / 2.0
- # search around either source or target keypoint, with 0.5 probability,
- # hoping to find decision boundary in between
- if random.random() > 0.5:
- from_point = to_point
- # optimize to find new db keypoint on hypersphere surphase around known keypoint
- db_point = self._find_decision_boundary_on_hypersphere(from_point, R)
- if (
- self.decision_boundary_distance(db_point)
- <= self.acceptance_threshold
- ):
- db_point2d = dimensionality_reduction.transform([db_point])[0]
- self.decision_boundary_points.append(db_point)
- self.decision_boundary_points_2d.append(db_point2d)
- i += 1
- retries = 0
- else:
- retries += 1
- if retries > self.hypersphere_max_retry_budget:
- i += 1
- dist = self.decision_boundary_distance(db_point)
- wandb.termerror(msg.format(dist, self.hypersphere_max_retry_budget))
- elif self.verbose:
- dist = self.decision_boundary_distance(db_point)
- wandb.termerror(
- "Found point is too distant from decision boundary ({}) retrying...".format(
- dist
- )
- )
- db_point2d = dimensionality_reduction.transform([db_point])[0]
- self.decision_boundary_points.append(db_point)
- self.decision_boundary_points_2d.append(db_point2d)
- i += 1
- retries = 0
- edges, gap_distances, gap_probability_scores = (
- self._get_sorted_db_keypoint_distances()
- ) # reload gaps
- self.decision_boundary_points = np.array(self.decision_boundary_points)
- self.decision_boundary_points_2d = np.array(self.decision_boundary_points_2d)
- return self
- def plot(self):
- # decision boundary
- decision_boundary_x = self.decision_boundary_points_2d[:, 0]
- decision_boundary_y = self.decision_boundary_points_2d[:, 1]
- decision_boundary_color = "Decision Boundary"
- # training data
- train_x = self.X2d[self.train_idx, 0]
- train_y = self.X2d[self.train_idx, 1]
- train_color = ["Class 1 - Train Set"
- if self.y_pred[self.train_idx[i]] == self.y[self.train_idx[i]] == 1
- else (
- "Class 2 - Train Set"
- if self.y_pred[self.train_idx[i]] == self.y[self.train_idx[i]] == 0
- else "Misclassified")
- for i in range(len(self.train_idx))]
- # testing data
- test_x = self.X2d[self.test_idx, 0]
- test_y = self.X2d[self.test_idx, 1]
- test_color = ["Class 1 - Test Set"
- if self.y_pred[self.test_idx[i]] == self.y[self.test_idx[i]] == 1
- else (
- "Class 2 - Test Set"
- if self.y_pred[self.test_idx[i]] == self.y[self.test_idx[i]] == 0
- else "Misclassified")
- for i in range(len(self.test_idx))]
- return (
- decision_boundary_x,
- decision_boundary_y,
- decision_boundary_color,
- train_x,
- train_y,
- train_color,
- test_x,
- test_y,
- test_color
- )
- def decision_boundary_distance(self, x, grad=0):
- return np.abs(0.5 - self.classifier.predict_proba([x])[0][1])
- def get_decision_boundary_keypoints(self):
- if len(self.decision_boundary_points) == 0:
- raise Exception("Please call the fit method first!")
- return self.decision_boundary_points, self.decision_boundary_points_2d
def _get_sorted_db_keypoint_distances(self, N=None):
- if N == None:
- N = self.n_interpolated_keypoints
- edges = minimum_spanning_tree(
- squareform(pdist(self.decision_boundary_points_2d))
- )
- edged = np.array(
- [
- euclidean(
- self.decision_boundary_points_2d[u],
- self.decision_boundary_points_2d[v],
- )
- for u, v in edges
- ]
- )
- gap_edge_idx = np.argsort(edged)[::-1][: int(N)]
- edges = edges[gap_edge_idx]
- gap_distances = np.square(edged[gap_edge_idx])
- gap_probability_scores = gap_distances / np.sum(gap_distances)
- return edges, gap_distances, gap_probability_scores
- def _linear_decision_boundary_optimization(
- self,
- from_idx,
- to_idx,
- all_combinations=True,
- retry_neighbor_if_failed=True,
- step=None,
- suppress_output=True,
- ):
- step_str = (
- ("Step " + str(step) + "/" + str(self.steps) + ":") if step != None else ""
- )
- retries = 4 if retry_neighbor_if_failed else 1
- for i in range(len(from_idx)):
- n = len(to_idx) if all_combinations else 1
- for j in range(n):
- from_i = from_idx[i]
- to_i = to_idx[j] if all_combinations else to_idx[i]
- for k in range(retries):
- if k == 0:
- from_point = self.Xminor[from_i]
- to_point = self.Xmajor[to_i]
- else:
- # first attempt failed, try nearest neighbors of source and destination
- # point instead
- _, idx = self.nn_model_2d_minorityclass.kneighbors(
- [self.Xminor2d[from_i]]
- )
- from_point = self.Xminor[idx[0][k // 2]]
- _, idx = self.nn_model_2d_minorityclass.kneighbors(
- [self.Xmajor2d[to_i]]
- )
- to_point = self.Xmajor[idx[0][k % 2]]
- if euclidean(from_point, to_point) == 0:
- break # no decision boundary between equivalent points
- db_point = self._find_decision_boundary_along_line(
- from_point,
- to_point,
- penalize_tangent_distance=self.penalties_enabled,
- penalize_extremes=self.penalties_enabled,
- )
- if (
- self.decision_boundary_distance(db_point)
- <= self.acceptance_threshold
- ):
- db_point2d = dimensionality_reduction.transform(
- [db_point]
- )[0]
- if (
- db_point2d[0] >= self.X2d_xmin
- and db_point2d[0] <= self.X2d_xmax
- and db_point2d[1] >= self.X2d_ymin
- and db_point2d[1] <= self.X2d_ymax
- ):
- self.decision_boundary_points.append(db_point)
- self.decision_boundary_points_2d.append(db_point2d)
- break
- else:
- if self.verbose and not suppress_output:
- msg = "{} {}/{}: Rejected decision boundary keypoint (outside of plot area)"
- def _find_decision_boundary_along_line(
- self,
- from_point,
- to_point,
- penalize_extremes=False,
- penalize_tangent_distance=False,
- ):
- def objective(l, grad=0):
- # interpolate between source and destionation; calculate distance from
- # decision boundary
- X = from_point + l[0] * (to_point - from_point)
- error = self.decision_boundary_distance(X)
- if penalize_tangent_distance:
- # distance from tangent between class1 and class0 point in 2d space
- x0, y0 = dimensionality_reduction.transform([X])[0]
- x1, y1 = dimensionality_reduction.transform([from_point])[0]
- x2, y2 = dimensionality_reduction.transform([to_point])[0]
- error += (
- 1e-12
- * np.abs((y2 - y1) * x0 - (x2 - x1) * y0 + x2 * y1 - y2 * x1)
- / np.sqrt((y2 - y1) ** 2 + (x2 - x1) ** 2)
- )
- if penalize_extremes:
- error += 1e-8 * np.abs(0.5 - l[0])
- return error
- optimizer = self._get_optimizer()
- optimizer.set_min_objective(objective)
- cl = optimizer.optimize([random.random()])
- db_point = from_point + cl[0] * (to_point - from_point)
- cl = optimizer.optimize([random.random()])
- db_point = from_point + cl[0] * (to_point - from_point)
- return db_point
- def _find_decision_boundary_on_hypersphere(self, centroid, R, penalize_known=False):
- def objective(phi, grad=0):
- # search on hypersphere surface in polar coordinates - map back to cartesian
- cx = centroid + polar_to_cartesian(phi, R)
- try:
- cx2d = dimensionality_reduction.transform([cx])[0]
- error = self.decision_boundary_distance(cx)
- if penalize_known:
- # slight penalty for being too close to already known decision boundary
- # keypoints
- db_distances = [
- euclidean(cx2d, self.decision_boundary_points_2d[k])
- for k in range(len(self.decision_boundary_points_2d))
- ]
- error += (
- 1e-8
- * (
- (self.mean_2d_dist - np.min(db_distances))
- / self.mean_2d_dist
- )
- ** 2
- )
- return error
- except Exception as ex:
- wandb.termerror("Error in objective function:", ex)
- return np.infty
- optimizer = self._get_optimizer(
- D=self.X.shape[1] - 1,
- upper_bound=2 * np.pi,
- iteration_budget=self.hypersphere_iteration_budget,
- )
- optimizer.set_min_objective(objective)
- db_phi = optimizer.optimize(
- [random.random() * 2 * np.pi for k in range(self.X.shape[1] - 1)]
- )
- db_point = centroid + polar_to_cartesian(db_phi, R)
- return db_point
- def _get_optimizer(self, D=1, upper_bound=1, iteration_budget=None):
- if iteration_budget == None:
- iteration_budget = self.linear_iteration_budget
- '''
- opt = nlopt.opt(nlopt.GN_DIRECT_L_RAND, D)
- opt.set_stopval(self.acceptance_threshold/10.0)
- opt.set_ftol_rel(1e-5)
- opt.set_maxeval(iteration_budget)
- opt.set_lower_bounds(0)
- opt.set_upper_bounds(upper_bound)
- return opt
- '''
-ddef minimum_spanning_tree(X, copy_X=True):
- if copy_X:
- X = X.copy()
- if X.shape[0] != X.shape[1]:
- raise ValueError("X needs to be square matrix of edge weights")
- n_vertices = X.shape[0]
- spanning_edges = []
- # initialize with node 0:
- visited_vertices = [0]
- num_visited = 1
- # exclude self connections:
- diag_indices = np.arange(n_vertices)
- X[diag_indices, diag_indices] = np.inf
- while num_visited != n_vertices:
- new_edge = np.argmin(X[visited_vertices], axis=None)
- # 2d encoding of new_edge from flat, get correct indices
- new_edge = divmod(new_edge, n_vertices)
- new_edge = [visited_vertices[new_edge[0]], new_edge[1]]
- # add edge to tree
- spanning_edges.append(new_edge)
- visited_vertices.append(new_edge[1])
- # remove all edges inside current tree
- X[visited_vertices, new_edge[1]] = np.inf
- X[new_edge[1], visited_vertices] = np.inf
- num_visited += 1
- return np.vstack(spanning_edges)
-def polar_to_cartesian(arr, r):
- a = np.concatenate((np.array([2 * np.pi]), arr))
- si = np.sin(a)
- si[0] = 1
- si = np.cumprod(si)
- co = np.cos(a)
- co = np.roll(co, -1)
- return si * co * r
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment