from flask import Flask, request, render_template_string import socket import threading import html
app = Flask(__name__)
@app.route('/', methods=["GET"]) defsource(): withopen(__file__, 'r', encoding='utf-8') as f: return'<pre>'+html.escape(f.read())+'</pre>'
@app.route('/', methods=["POST"]) deftemplate(): template_code = request.form.get("code") # 安全过滤 blacklist = ['__', 'import', 'os', 'sys', 'eval', 'subprocess', 'popen', 'system', '\r', '\n'] for black in blacklist: if black in template_code: return"Forbidden content detected!" result = render_template_string(template_code) print(result) return'ok'if result isnotNoneelse'error'
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as proxy_socket: proxy_socket.connect((self.target_host, self.target_port)) proxy_socket.sendall(request_data)
header_end = response_data.rfind(b"\r\n\r\n") if header_end != -1: body = response_data[header_end + 4:] else: body = response_data response_body = body response = b"HTTP/1.1 200 OK\r\n" \ b"Content-Length: " + str(len(response_body)).encode() + b"\r\n" \ b"Content-Type: text/html; charset=utf-8\r\n" \ b"\r\n" + response_body
client_socket.sendall(response) except Exception as e: print(f"Proxy Error: {e}") finally: client_socket.close()
defstart_proxy_server(host, port, target_host, target_port): proxy_handler = HTTPProxyHandler(target_host, target_port) server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((host, port)) server_socket.listen(100) print(f"Proxy server is running on {host}:{port} and forwarding to {target_host}:{target_port}...")
definitialize(name:, age:, details:) @name = name @age = age @details = details end
defself.url @@url end
defmerge_with(additional) recursive_merge(self, additional) end
private
defrecursive_merge(original, additional, current_obj = original) additional.each do |key, value| if value.is_a?(Hash) if current_obj.respond_to?(key) next_obj = current_obj.public_send(key) recursive_merge(original, value, next_obj) else new_object = Object.new current_obj.instance_variable_set("@#{key}", new_object) current_obj.singleton_class.attr_accessor key end else if current_obj.is_a?(Hash) current_obj[key] = value else current_obj.instance_variable_set("@#{key}", value) current_obj.singleton_class.attr_accessor key end end end original end end
classUser < Person definitialize(name:, age:, details:) super(name: name, age: age, details: details) end end
# GET /launch-curl-command - Activates the first gadget get '/launch-curl-command'do content_type :json
# This gadget makes an HTTP request to the URL stored in the User class ifPerson.respond_to?(:url) url = Person.url response = Net::HTTP.get_response(URI(url)) { status:'HTTP request made', url: url }.to_json else { status:'Failed to access URL variable' }.to_json end end get '/sign_with_subclass_key'do content_type :json signer = KeySigner.new signed_data = signer.sign(KeySigner.signing_key, "data-to-sign")
{ status:'Data signed', signing_key:KeySigner.signing_key, signed_data: signed_data }.to_json end get '/check-infected-vars'do content_type :json
{ user_url:Person.url, signing_key:KeySigner.signing_key }.to_json end
get('/') do erb :hello end run! if app_file == $0 end